Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

من خادم أحادي المسار إلى خادم متعدد المسارات (Multithreaded)

في الوقت الحالي، يقوم الخادم بمعالجة كل طلب بالترتيب، مما يعني أنه لن يعالج اتصالاً ثانياً حتى ينتهي من معالجة الاتصال الأول. إذا تلقى الخادم المزيد والمزيد من الطلبات، فسيكون هذا التنفيذ التسلسلي أقل كفاءة بشكل متزايد. إذا تلقى الخادم طلباً يستغرق وقتاً طويلاً للمعالجة، فستضطر الطلبات اللاحقة إلى الانتظار حتى ينتهي الطلب الطويل، حتى لو كان من الممكن معالجة الطلبات الجديدة بسرعة. سنحتاج إلى إصلاح ذلك، ولكن أولاً سنلقي نظرة على المشكلة بشكل عملي.

محاكاة طلب بطيء

سنلقي نظرة على كيفية تأثير طلب بطيء المعالجة على الطلبات الأخرى المقدمة إلى تنفيذ الخادم الحالي لدينا. تنفذ القائمة 21-10 معالجة طلب إلى المسار /sleep مع استجابة بطيئة محاكاة ستجعل الخادم ينام لمدة خمس ثوانٍ قبل الاستجابة.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

لقد انتقلنا من استخدام if إلى match الآن بعد أن أصبح لدينا ثلاث حالات. نحتاج إلى المطابقة صراحةً على شريحة (slice) من request_line لمطابقة النمط مع قيم السلسلة النصية الثابتة؛ حيث لا يقوم match بعملية الإسناد المرجعي وإلغاء الإسناد (referencing and dereferencing) تلقائياً كما تفعل دالة المساواة.

الذراع الأول هو نفسه كتلة if من القائمة 21-9. الذراع الثاني يطابق طلباً إلى /sleep. عند استلام هذا الطلب، سينام الخادم لمدة خمس ثوانٍ قبل عرض صفحة HTML الناجحة. الذراع الثالث هو نفسه كتلة else من القائمة 21-9.

يمكنك أن ترى مدى بساطة خادمنا: المكتبات الحقيقية ستتعامل مع التعرف على الطلبات المتعددة بطريقة أقل إسهاباً بكثير!

ابدأ تشغيل الخادم باستخدام cargo run. ثم افتح نافذتين في المتصفح: واحدة لـ http://127.0.0.1:7878 والأخرى لـ http://127.0.0.1:7878/sleep. إذا قمت بإدخال عنوان / عدة مرات، كما فعلت سابقاً، فسترى أنه يستجيب بسرعة. ولكن إذا أدخلت /sleep ثم قمت بتحميل /، فسترى أن / ينتظر حتى ينتهي sleep من نومه لمدة خمس ثوانٍ كاملة قبل التحميل.

هناك تقنيات متعددة يمكننا استخدامها لتجنب تراكم الطلبات خلف طلب بطيء، بما في ذلك استخدام البرمجة غير المتزامنة (async) كما فعلنا في الفصل 17؛ والتقنية التي سنقوم بتنفيذها هي “حوض المسارات” (thread pool).

تحسين معدل النقل باستخدام حوض المسارات (Thread Pool)

“حوض المسارات” (thread pool) هو مجموعة من المسارات (threads) التي تم إنشاؤها وهي جاهزة وتنتظر معالجة مهمة ما. عندما يتلقى البرنامج مهمة جديدة، فإنه يعين أحد المسارات في الحوض للمهمة، وسيقوم هذا المسار بمعالجة المهمة. تظل المسارات المتبقية في الحوض متاحة للتعامل مع أي مهام أخرى تأتي أثناء معالجة المسار الأول. عندما ينتهي المسار الأول من معالجة مهمته، يتم إعادته إلى حوض المسارات الخاملة، ليكون جاهزاً للتعامل مع مهمة جديدة. يسمح لك حوض المسارات بمعالجة الاتصالات بشكل متزامن (concurrently)، مما يزيد من معدل نقل البيانات (throughput) في خادمك.

سنحدد عدد المسارات في الحوض برقم صغير لحمايتنا من هجمات الحرمان من الخدمة (DoS)؛ فلو جعلنا برنامجنا ينشئ مساراً جديداً لكل طلب يأتي، لتمكن شخص يقوم بتقديم 10 ملايين طلب إلى خادمنا من إحداث فوضى عارمة عن طريق استهلاك جميع موارد خادمنا وإيقاف معالجة الطلبات تماماً.

بدلاً من إنشاء مسارات غير محدودة، سيكون لدينا عدد ثابت من المسارات التي تنتظر في الحوض. يتم إرسال الطلبات الواردة إلى الحوض للمعالجة. سيحتفظ الحوض بطابور (queue) من الطلبات الواردة. سيقوم كل مسار من المسارات في الحوض بسحب طلب من هذا الطابور، ومعالجة الطلب، ثم يطلب من الطابور طلباً آخر. بهذا التصميم، يمكننا معالجة ما يصل إلى N من الطلبات بشكل متزامن، حيث N هو عدد المسارات. إذا كان كل مسار يستجيب لطلب يستغرق وقتاً طويلاً، فلا يزال بإمكان الطلبات اللاحقة التراكم في الطابور، لكننا زدنا عدد الطلبات طويلة الأمد التي يمكننا التعامل معها قبل الوصول إلى تلك النقطة.

هذه التقنية هي مجرد واحدة من طرق عديدة لتحسين معدل نقل خادم الويب. الخيارات الأخرى التي قد تستكشفها هي نموذج fork/join، ونموذج الإدخال/الإخراج غير المتزامن أحادي المسار، ونموذج الإدخال/الإخراج غير المتزامن متعدد المسارات. إذا كنت مهتماً بهذا الموضوع، يمكنك قراءة المزيد عن الحلول الأخرى ومحاولة تنفيذها؛ فمع لغة منخفضة المستوى مثل Rust، كل هذه الخيارات ممكنة.

قبل أن نبدأ في تنفيذ حوض المسارات، دعونا نتحدث عن الشكل الذي يجب أن يكون عليه استخدام الحوض. عندما تحاول تصميم كود، فإن كتابة واجهة العميل أولاً يمكن أن تساعد في توجيه تصميمك. اكتب الواجهة البرمجية (API) للكود بحيث تكون منظمة بالطريقة التي تريد استدعاءها بها؛ ثم قم بتنفيذ الوظائف داخل هذا الهيكل بدلاً من تنفيذ الوظائف ثم تصميم الواجهة البرمجية العامة.

على غرار كيفية استخدامنا للتطوير المدفوع بالاختبار (TDD) في مشروع الفصل 12، سنستخدم هنا التطوير المدفوع بالمترجم (compiler-driven development). سنكتب الكود الذي يستدعي الدوال التي نريدها، ثم سننظر في الأخطاء الواردة من المترجم لتحديد ما يجب تغييره تالياً لجعل الكود يعمل. قبل القيام بذلك، سنستكشف التقنية التي لن نستخدمها كنقطة انطلاق.

إنشاء مسار لكل طلب

أولاً، دعونا نستكشف كيف قد يبدو الكود الخاص بنا إذا قام بإنشاء مسار جديد لكل اتصال. كما ذكرنا سابقاً، هذه ليست خطتنا النهائية بسبب مشاكل احتمال إنشاء عدد غير محدود من المسارات، ولكنها نقطة انطلاق للحصول على خادم متعدد المسارات يعمل أولاً. بعد ذلك، سنضيف حوض المسارات كتحسين، وسيكون التباين بين الحلين أسهل.

توضح القائمة 21-11 التغييرات التي يجب إجراؤها على main لإنشاء مسار جديد للتعامل مع كل تدفق (stream) داخل حلقة for.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

كما تعلمت في الفصل 16، سيقوم thread::spawn بإنشاء مسار جديد ثم تشغيل الكود الموجود في الإغلاق (closure) في المسار الجديد. إذا قمت بتشغيل هذا الكود وتحميل /sleep في متصفحك، ثم / في علامتي تبويب إضافيتين، فسترى بالفعل أن الطلبات إلى / لا تضطر إلى انتظار انتهاء /sleep. ومع ذلك، كما ذكرنا، سيؤدي هذا في النهاية إلى إرهاق النظام لأنك ستنشئ مسارات جديدة دون أي حدود.

قد تتذكر أيضاً من الفصل 17 أن هذا هو بالضبط نوع الموقف الذي تتألق فيه البرمجة غير المتزامنة (async) و (await)! ضع ذلك في اعتبارك بينما نبني حوض المسارات وفكر في كيف ستكون الأمور مختلفة أو متشابهة مع async.

إنشاء عدد محدود من المسارات

نريد أن يعمل حوض المسارات الخاص بنا بطريقة مماثلة ومألوفة بحيث لا يتطلب الانتقال من المسارات إلى حوض المسارات تغييرات كبيرة في الكود الذي يستخدم واجهتنا البرمجية. توضح القائمة 21-12 الواجهة الافتراضية لهيكل ThreadPool الذي نريد استخدامه بدلاً من thread::spawn.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

نستخدم ThreadPool::new لإنشاء حوض مسارات جديد بعدد قابل للتكوين من المسارات، في هذه الحالة أربعة. ثم، في حلقة for تمتلك pool.execute واجهة مماثلة لـ thread::spawn من حيث أنها تأخذ إغلاقاً يجب أن يشغله الحوض لكل تدفق. نحتاج إلى تنفيذ pool.execute بحيث تأخذ الإغلاق وتعطيه لمسار في الحوض لتشغيله. لن يتم تجميع هذا الكود بعد، لكننا سنحاول حتى يتمكن المترجم من توجيهنا في كيفية إصلاحه.

بناء ThreadPool باستخدام التطوير المدفوع بالمترجم

قم بإجراء التغييرات في القائمة 21-12 على ملف src/main.rs، ثم دعنا نستخدم أخطاء المترجم من cargo check لتوجيه تطويرنا. إليك الخطأ الأول الذي نحصل عليه:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

رائع! يخبرنا هذا الخطأ أننا بحاجة إلى نوع أو وحدة ThreadPool لذا سنقوم ببناء واحدة الآن. سيكون تنفيذ ThreadPool الخاص بنا مستقلاً عن نوع العمل الذي يقوم به خادم الويب الخاص بنا. لذا، دعونا نحول صندوق hello من صندوق ثنائي إلى صندوق مكتبة (library crate) ليحتوي على تنفيذ ThreadPool الخاص بنا. بعد التغيير إلى صندوق مكتبة، يمكننا أيضاً استخدام مكتبة حوض المسارات المنفصلة لأي عمل نريد القيام به باستخدام حوض مسارات، وليس فقط لخدمة طلبات الويب.

أنشئ ملف src/lib.rs يحتوي على ما يلي، وهو أبسط تعريف لهيكل ThreadPool يمكننا الحصول عليه في الوقت الحالي:

pub struct ThreadPool;

ثم، قم بتحرير ملف main.rs لجلب ThreadPool إلى النطاق من صندوق المكتبة عن طريق إضافة الكود التالي إلى أعلى ملف src/main.rs:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

لا يزال هذا الكود لا يعمل، ولكن دعنا نتحقق منه مرة أخرى للحصول على الخطأ التالي الذي نحتاج إلى معالجته:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

يشير هذا الخطأ إلى أننا نحتاج تالياً إلى إنشاء دالة مرتبطة تسمى new لـ ThreadPool. نعلم أيضاً أن new تحتاج إلى معامل واحد يمكنه قبول 4 كمعامل ويجب أن تعيد نسخة من ThreadPool. دعونا ننفذ أبسط دالة new تمتلك تلك الخصائص:

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

اخترنا usize كنوع لمعامل size لأننا نعلم أن عدداً سالباً من المسارات لا معنى له. نعلم أيضاً أننا سنستخدم هذا… (تم اختصار المحتوى بسبب حدود الحجم)

لنبدأ بإنشاء قناة (channel) في ThreadPool::new والاحتفاظ بالمرسل (sender) في نسخة ThreadPool كما هو موضح في القائمة 21-16. هيكل Job لا يحمل أي شيء في الوقت الحالي ولكنه سيكون نوع العنصر الذي نرسله عبر القناة.

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

في ThreadPool::new ننشئ قناتنا الجديدة ونجعل الحوض يحتفظ بالمرسل. سيتم تجميع هذا بنجاح.

دعونا نحاول تمرير مستقبل (receiver) القناة إلى كل Worker بينما يقوم حوض المسارات بإنشاء القناة. نعلم أننا نريد استخدام المستقبل في المسار الذي تنشئه نسخ Worker لذا سنشير إلى معامل receiver في الإغلاق. الكود في القائمة 21-17 لن يتم تجميعه تماماً بعد.

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

لقد أجرينا بعض التغييرات الصغيرة والمباشرة: نمرر المستقبل إلى Worker::new ثم نستخدمه داخل الإغلاق.

عندما نحاول التحقق من هذا الكود، نحصل على هذا الخطأ:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

يحاول الكود تمرير receiver إلى نسخ Worker متعددة. هذا لن يعمل، كما ستتذكر من الفصل 16: تنفيذ القناة الذي توفره Rust هو “منتجون متعددون، مستهلك واحد” (multiple producer, single consumer). هذا يعني أنه لا يمكننا ببساطة استنساخ طرف الاستهلاك في القناة لإصلاح هذا الكود. كما أننا لا نريد إرسال رسالة عدة مرات إلى مستهلكين متعددين؛ نريد قائمة واحدة من الرسائل مع نسخ Worker متعددة بحيث تتم معالجة كل رسالة مرة واحدة.

بالإضافة إلى ذلك، فإن سحب مهمة من طابور القناة يتضمن تعديل الـ receiver لذا تحتاج المسارات إلى طريقة آمنة لمشاركة وتعديل receiver؛ وإلا فقد نحصل على حالات تسابق (race conditions) (كما تمت تغطيتها في الفصل 16).

تذكر المؤشرات الذكية الآمنة للمسارات التي نوقشت في الفصل 16: لمشاركة الملكية عبر مسارات متعددة والسماح للمسارات بتعديل القيمة، نحتاج إلى استخدام Arc<Mutex<T>>. سيسمح نوع Arc لنسخ Worker متعددة بامتلاك المستقبل، وسيضمن Mutex أن عاملاً واحداً فقط يحصل على مهمة من المستقبل في كل مرة. توضح القائمة 21-18 التغييرات التي نحتاج إلى إجرائها.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

في ThreadPool::new نضع المستقبل في Arc و Mutex. لكل Worker جديد، نقوم باستنساخ الـ Arc لزيادة عدد المراجع بحيث يمكن لنسخ Worker مشاركة ملكية المستقبل.

مع هذه التغييرات، يتم تجميع الكود! نحن نقترب!

تنفيذ دالة execute

دعونا أخيراً ننفذ دالة execute على ThreadPool. سنقوم أيضاً بتغيير Job من هيكل إلى اسم مستعار للنوع (type alias) لكائن سمة (trait object) يحمل نوع الإغلاق الذي تستقبله execute. كما نوقش في قسم “مرادفات الأنواع وأسماء الأنواع المستعارة” في الفصل 20، تسمح لنا أسماء الأنواع المستعارة بجعل الأنواع الطويلة أقصر لسهولة الاستخدام. انظر إلى القائمة 21-19.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

بعد إنشاء نسخة Job جديدة باستخدام الإغلاق الذي نحصل عليه في execute نرسل تلك المهمة عبر طرف الإرسال في القناة. نحن نستدعي unwrap على send للحالة التي يفشل فيها الإرسال. قد يحدث هذا إذا قمنا، على سبيل المثال، بإيقاف جميع مساراتنا عن التنفيذ، مما يعني أن طرف الاستقبال قد توقف عن استقبال رسائل جديدة. في الوقت الحالي، لا يمكننا إيقاف مساراتنا عن التنفيذ: تستمر مساراتنا في التنفيذ طالما أن الحوض موجود. السبب في استخدامنا لـ unwrap هو أننا نعلم أن حالة الفشل لن تحدث، لكن المترجم لا يعرف ذلك.

لكننا لم ننتهِ تماماً بعد! في الـ Worker لا يزال الإغلاق الذي يتم تمريره إلى thread::spawn يشير فقط إلى طرف الاستقبال في القناة. بدلاً من ذلك، نحتاج إلى أن يدخل الإغلاق في حلقة تكرار للأبد، يطلب من طرف الاستقبال في القناة مهمة ويقوم بتشغيل المهمة عندما يحصل على واحدة. دعونا نجري التغيير الموضح في القائمة 21-20 على Worker::new.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

هنا، نستدعي أولاً lock على الـ receiver للحصول على الـ mutex، ثم نستدعي unwrap للهلع (panic) عند حدوث أي أخطاء. قد يفشل الحصول على القفل إذا كان الـ mutex في حالة مسمومة (poisoned)، والتي يمكن أن تحدث إذا هلع مسار آخر أثناء امتلاك القفل بدلاً من تحريره. في هذه الحالة، استدعاء unwrap لجعل هذا المسار يهلع هو الإجراء الصحيح الذي يجب اتخاذه. لا تتردد في تغيير unwrap هذه إلى expect مع رسالة خطأ ذات معنى بالنسبة لك.

إذا حصلنا على القفل في الـ mutex، فنحن نستدعي recv لاستقبال Job من القناة. استدعاء unwrap نهائي يتجاوز أي أخطاء هنا أيضاً، والتي قد تحدث إذا تم إغلاق المسار الذي يحمل المرسل، على غرار كيفية إرجاع دالة send لـ Err إذا تم إغلاق المستقبل.

استدعاء recv يحجب التنفيذ (blocks)، لذا إذا لم تكن هناك مهمة بعد، فسوف ينتظر المسار الحالي حتى تصبح المهمة متاحة. يضمن Mutex<T> أن مسار Worker واحداً فقط في كل مرة يحاول طلب مهمة.

حوض المسارات الخاص بنا الآن في حالة عمل! جربه باستخدام cargo run وقم بإجراء بعض الطلبات:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

نجاح! لدينا الآن حوض مسارات ينفذ الاتصالات بشكل غير متزامن. لا يتم إنشاء أكثر من أربعة مسارات أبداً، لذا لن يتعرض نظامنا للحمل الزائد إذا تلقى الخادم الكثير من الطلبات. إذا قدمنا طلباً إلى /sleep، فسيكون الخادم قادراً على خدمة الطلبات الأخرى عن طريق جعل مسار آخر يقوم بتشغيلها.

ملاحظة: إذا فتحت /sleep في نوافذ متصفح متعددة في وقت واحد، فقد يتم تحميلها واحدة تلو الأخرى في فترات زمنية مدتها خمس ثوانٍ. تقوم بعض متصفحات الويب بتنفيذ نسخ متعددة من نفس الطلب بشكل تسلسلي لأسباب تتعلق بالتخزين المؤقت (caching). هذا القيد لا يسببه خادم الويب الخاص بنا.

هذا وقت جيد للتوقف والتفكير في كيف سيكون الكود في القوائم 21-18 و 21-19 و 21-20 مختلفاً إذا كنا نستخدم الـ futures بدلاً من الإغلاق للعمل الذي يتعين القيام به. ما هي الأنواع التي ستتغير؟ كيف ستكون تواقيع الدوال مختلفة، إن وجدت؟ ما هي أجزاء الكود التي ستبقى كما هي؟

بعد التعرف على حلقة while let في الفصل 17 والفصل 19، قد تتساءل لماذا لم نكتب كود مسار Worker كما هو موضح في القائمة 21-21.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

هذا الكود يتم تجميعه وتشغيله ولكنه لا يؤدي إلى سلوك المسارات المطلوب: سيظل الطلب البطيء يتسبب في انتظار الطلبات الأخرى للمعالجة. السبب دقيق نوعاً ما: هيكل Mutex لا يحتوي على دالة unlock عامة لأن ملكية القفل تعتمد على عمر MutexGuard<T> داخل LockResult<MutexGuard<T>> الذي تعيده دالة lock. في وقت التجميع، يمكن لمدقق الاستعارة (borrow checker) بعد ذلك فرض القاعدة التي تنص على أنه لا يمكن الوصول إلى مورد يحميه Mutex ما لم نكن نمتلك القفل. ومع ذلك، يمكن أن يؤدي هذا التنفيذ أيضاً إلى الاحتفاظ بالقفل لفترة أطول من المقصود إذا لم نكن منتبهين لعمر MutexGuard<T>.

الكود في القائمة 21-20 الذي يستخدم let job = receiver.lock().unwrap().recv().unwrap(); يعمل لأنه مع let يتم إسقاط أي قيم مؤقتة مستخدمة في التعبير على الجانب الأيمن من علامة التساوي فور انتهاء عبارة let. ومع ذلك، فإن while letif let و match) لا تسقط القيم المؤقتة حتى نهاية الكتلة المرتبطة. في القائمة 21-21، يظل القفل ممسوكاً طوال مدة استدعاء job()، مما يعني أن نسخ Worker الأخرى لا يمكنها استقبال المهام.