يعمل الإدخال والإخراج (I/O) في Tokio بنفس الطريقة التي يعمل بها في std تقريبًا، ولكن بشكل غير متزامن (asynchronously). توجد سمة (trait) للقراءة ([AsyncRead]) وسمة للكتابة ([AsyncWrite]). تنفذ أنواع محددة هذه السمات حسب الاقتضاء ([TcpStream]، [File]، [Stdout]). يتم تنفيذ [AsyncRead] و [AsyncWrite] أيضًا من خلال عدد من هياكل البيانات، مثل Vec<u8> و &[u8]. يتيح ذلك استخدام مصفوفات البايت (byte arrays) حيث يُتوقع وجود قارئ (reader) أو كاتب (writer).
ستغطي هذه الصفحة أساسيات القراءة والكتابة في I/O مع Tokio وسنعمل من خلال بعض الأمثلة. الصفحة التالية ستنتقل إلى مثال I/O أكثر تقدمًا.
AsyncRead و AsyncWrite
توفر هاتان السمتان (traits) التسهيلات للقراءة من تدفقات البايت (byte streams) والكتابة إليها بشكل غير متزامن. عادة لا يتم استدعاء الأساليب (methods) في هذه السمات مباشرة، بشكل مشابه لكيفية عدم استدعاء أسلوب poll يدويًا من سمة Future. بدلاً من ذلك، ستستخدمها من خلال أساليب الأدوات المساعدة التي توفرها [AsyncReadExt] و [AsyncWriteExt].
دعونا نلقي نظرة سريعة على عدد قليل من هذه الأساليب. جميع هذه الوظائف (functions) هي async ويجب استخدامها مع .await.
async fn read()
يوفر AsyncReadExt::read أسلوبًا غير متزامن لقراءة البيانات في مخزن مؤقت (buffer)، وإرجاع عدد البايتات المقروءة.
ملاحظة: عندما يرجع read() القيمة Ok(0)، فهذا يشير إلى أن التدفق (stream) مغلق. أي استدعاءات أخرى لـ read() ستكتمل على الفور بـ Ok(0). مع مثيلات [TcpStream]، يشير هذا إلى أن نصف القراءة من المقبس (socket) مغلق.
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
# fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// قراءة ما يصل إلى 10 بايتات
let n = f.read(&mut buffer[..]).await?;
println!("البايتات: {:?}", &buffer[..n]);
Ok(())
}
# }
async fn read_to_end()
يقرأ AsyncReadExt::read_to_end جميع البايتات من التدفق حتى الوصول إلى نهاية الملف (EOF).
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;
# fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();
// قراءة الملف بأكمله
f.read_to_end(&mut buffer).await?;
Ok(())
}
# }
async fn write()
يقوم AsyncWriteExt::write بكتابة مخزن مؤقت (buffer) في الكاتب (writer)، مع إرجاع عدد البايتات التي تمت كتابتها.
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
# fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;
// يكتب بعض بادئة سلسلة البايت، ولكن ليس بالضرورة كلها.
let n = file.write(b"some bytes").await?;
println!("تمت كتابة أول {} بايتات من 'some bytes'.", n);
Ok(())
}
# }
async fn write_all()
يقوم AsyncWriteExt::write_all بكتابة المخزن المؤقت (buffer) بالكامل في الكاتب (writer).
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
# fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;
file.write_all(b"some bytes").await?;
Ok(())
}
# }
تتضمن كلتا السمتين عددًا من الأساليب المفيدة الأخرى. راجع وثائق API للحصول على قائمة شاملة.
وظائف مساعدة (Helper functions)
بالإضافة إلى ذلك، تمامًا مثل std ، تحتوي وحدة [tokio::io] على عدد من الوظائف المساعدة المفيدة بالإضافة إلى واجهات برمجة التطبيقات للتعامل مع الإدخال القياسي و الإخراج القياسي و الخطأ القياسي. على سبيل المثال، يقوم tokio::io::copy بنسخ محتويات القارئ بالكامل بشكل غير متزامن إلى كاتب.
use tokio::fs::File;
use tokio::io;
# fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
# }
لاحظ أن هذا يستخدم حقيقة أن مصفوفات البايت تنفذ أيضًا AsyncRead.
خادم الصدى (Echo server)
دعونا نتدرب على القيام ببعض عمليات I/O غير المتزامنة. سنقوم بكتابة خادم صدى (echo server).
يربط خادم الصدى TcpListener ويقبل الاتصالات الواردة في حلقة (loop). لكل اتصال وارد، يتم قراءة البيانات من المقبس (socket) وكتابتها على الفور مرة أخرى إلى المقبس. يرسل العميل البيانات إلى الخادم ويتلقى نفس البيانات بالضبط مرة أخرى.
سنقوم بتنفيذ خادم الصدى مرتين، باستخدام استراتيجيات مختلفة قليلاً.
باستخدام io::copy()
للبدء، سنقوم بتنفيذ منطق الصدى باستخدام أداة io::copy.
يمكنك كتابة هذا الكود في ملف ثنائي جديد:
$ touch src/bin/echo-server-copy.rs
والذي يمكنك تشغيله (أو مجرد التحقق من التجميع) باستخدام:
$ cargo run --bin echo-server-copy
ستتمكن من تجربة الخادم باستخدام أداة سطر أوامر قياسية مثل telnet ، أو عن طريق كتابة عميل بسيط مثل العميل الموجود في وثائق tokio::net::TcpStream.
هذا خادم TCP ويحتاج إلى حلقة قبول (accept loop). يتم إنشاء مهمة (task) جديدة لمعالجة كل مقبس (socket) مقبول.
use tokio::io;
use tokio::net::TcpListener;
# fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// نسخ البيانات هنا
});
}
}
# }
كما رأينا سابقًا، تأخذ هذه الوظيفة المساعدة قارئًا وكاتبًا وتنسخ البيانات من أحدهما إلى الآخر. ومع ذلك، لدينا فقط TcpStream واحد. تنفذ هذه القيمة المفردة كلاً من AsyncRead و AsyncWrite. نظرًا لأن io::copy يتطلب &mut لكل من القارئ والكاتب، فلا يمكن استخدام المقبس لكلا الوسيطين (arguments).
```rust,compile_fail // هذا يفشل في التجميع io::copy(&mut socket, &mut socket).await
## تقسيم القارئ + الكاتب (Splitting a reader + writer)
للتغلب على هذه المشكلة، يجب علينا تقسيم المقبس إلى مقبض قارئ (reader handle) ومقبض كاتب (writer handle). تعتمد أفضل طريقة لتقسيم مزيج القارئ/الكاتب على النوع المحدد.
يمكن تقسيم أي نوع قارئ + كاتب باستخدام أداة [`io::split`][split]. تأخذ هذه الوظيفة قيمة واحدة وترجع مقابض قارئ وكاتب منفصلة. يمكن استخدام هذين المقبضين بشكل مستقل، بما في ذلك من مهام منفصلة.
على سبيل المثال، يمكن لعميل الصدى التعامل مع القراءات والكتابات المتزامنة مثل هذا:
```rust
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
# fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream.connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
// كتابة البيانات في الخلفية
tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
// أحيانًا، يحتاج مستنتج النوع في rust
// إلى القليل من المساعدة
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("تلقيت {:?}", &buf[..n]);
}
Ok(())
}
# }
نظرًا لأن io::split يدعم أي قيمة تنفذ AsyncRead + AsyncWrite ويرجع مقابض مستقلة، فإن io::split يستخدم داخليًا Arc و Mutex. يمكن تجنب هذا العبء مع TcpStream. يوفر TcpStream وظيفتي تقسيم متخصصتين.
يأخذ [TcpStream::split] مرجعًا (reference) للتدفق ويرجع مقبض قارئ وكاتب. نظرًا لاستخدام مرجع، يجب أن يظل كلا المقبضين في نفس المهمة التي تم استدعاء split() منها. هذا التقسيم المتخصص هو صفر التكلفة (zero-cost). لا حاجة لـ Arc أو Mutex. يوفر TcpStream أيضًا [into_split] الذي يدعم المقابض التي يمكن أن تنتقل عبر المهام بتكلفة Arc فقط.
نظرًا لأنه يتم استدعاء io::copy() في نفس المهمة التي تمتلك TcpStream ، يمكننا استخدام [TcpStream::split]. تصبح المهمة التي تعالج منطق الصدى في الخادم:
# use tokio::io;
# use tokio::net::TcpStream;
# fn dox(mut socket: TcpStream) {
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("فشل النسخ");
}
});
# }
يمكنك العثور على الكود بالكامل هنا.
النسخ اليدوي (Manual copying)
الآن دعونا نلقي نظرة على كيفية كتابة خادم الصدى عن طريق نسخ البيانات يدويًا. للقيام بذلك، نستخدم AsyncReadExt::read و AsyncWriteExt::write_all.
خادم الصدى الكامل هو كما يلي:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
# fn dox() {
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
// تشير قيمة الإرجاع `Ok(0)` إلى أن الطرف البعيد قد أغلق
Ok(0) => return,
Ok(n) => {
// نسخ البيانات مرة أخرى إلى المقبس
if socket.write_all(&buf[..n]).await.is_err() {
// خطأ غير متوقع في المقبس. لا يوجد الكثير مما يمكننا
// فعله هنا لذا توقف عن المعالجة فقط.
return;
}
}
Err(_) => {
// خطأ غير متوقع في المقبس. لا يوجد الكثير مما يمكننا فعله
// هنا لذا توقف عن المعالجة فقط.
return;
}
}
}
});
}
}
# }
(يمكنك وضع هذا الكود في src/bin/echo-server.rs وتشغيله باستخدام cargo run --bin echo-server).
دعونا نحلله. أولاً، بما أنه يتم استخدام أدوات AsyncRead و AsyncWrite المساعدة، يجب جلب سمات الامتداد (extension traits) إلى النطاق.
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
تخصيص مخزن مؤقت (Allocating a buffer)
الاستراتيجية هي قراءة بعض البيانات من المقبس في مخزن مؤقت ثم كتابة محتويات المخزن المؤقت مرة أخرى إلى المقبس.
let mut buf = vec![0; 1024];
يتم تجنب المخزن المؤقت للمكدس (stack buffer) صراحةً. تذكر من سابقًا، لاحظنا أن جميع بيانات المهمة التي تعيش عبر الاستدعاءات لـ .await يجب أن يتم تخزينها بواسطة المهمة. في هذه الحالة، يتم استخدام buf عبر استدعاءات .await. يتم تخزين جميع بيانات المهمة في تخصيص واحد. يمكنك التفكير في الأمر على أنه enum حيث يكون كل متغير هو البيانات التي يجب تخزينها لاستدعاء معين لـ .await.
إذا تم تمثيل المخزن المؤقت بواسطة مصفوفة مكدس (stack array)، فقد يبدو الهيكل الداخلي للمهام التي تم إنشاؤها لكل مقبس مقبول شيئًا كالتالي:
```rust,compile_fail struct Task { // حقول المهمة الداخلية هنا task: enum { AwaitingRead { socket: TcpStream, buf: [BufferType], }, AwaitingWriteAll { socket: TcpStream, buf: [BufferType], }
}
}
إذا تم استخدام مصفوفة مكدس كنوع للمخزن المؤقت، فسيتم تخزينها *بشكل مضمن (inline)* في هيكل المهمة. سيجعل هذا هيكل المهمة كبيرًا جدًا. بالإضافة إلى ذلك، غالبًا ما تكون أحجام المخازن المؤقتة بحجم الصفحة. وهذا بدوره سيجعل `Task` بحجم غير مريح: `$page-size + a-few-bytes`.
يقوم المترجم (compiler) بتحسين تخطيط الكتل غير المتزامنة بشكل أكبر من `enum` أساسي. في الممارسة العملية، لا يتم نقل المتغيرات بين المتغيرات كما هو مطلوب مع `enum`. ومع ذلك، فإن حجم هيكل المهمة لا يقل عن حجم أكبر متغير.
بسبب هذا، عادة ما يكون من الأكثر كفاءة استخدام تخصيص مخصص للمخزن المؤقت.
## معالجة EOF
عندما يتم إغلاق نصف القراءة من تدفق TCP، فإن استدعاء `read()` يرجع `Ok(0)`. من المهم الخروج من حلقة القراءة عند هذه النقطة. نسيان الخروج من حلقة القراءة عند EOF هو مصدر شائع للأخطاء.
```rust
# use tokio::io::AsyncReadExt;
# use tokio::net::TcpStream;
# async fn dox(mut socket: TcpStream) {
# let mut buf = vec![0_u8; 1024];
loop {
match socket.read(&mut buf).await {
// تشير قيمة الإرجاع `Ok(0)` إلى أن الطرف البعيد قد أغلق
Ok(0) => return,
// ... الحالات الأخرى التي يتم التعامل معها هنا
# _ => unreachable!(),
}
}
# }
عادة ما يؤدي نسيان الخروج من حلقة القراءة إلى حالة حلقة لا نهائية تستهلك 100% من وحدة المعالجة المركزية. عند إغلاق المقبس، يرجع socket.read() على الفور. ثم تتكرر الحلقة إلى الأبد.
يمكن العثور على الكود الكامل هنا.