الإغلاق الآمن (Graceful Shutdown) والتنظيف (Cleanup)
يستجيب الكود في القائمة 21-20 للطلبات بشكل غير متزامن (asynchronously) من خلال استخدام مجمع خيوط المعالجة (thread pool)، كما خططنا. تظهر لنا بعض التحذيرات حول حقول (fields) workers و id و thread التي لا نستخدمها بطريقة مباشرة، مما يذكرنا بأننا لا نقوم بأي عملية تنظيف. عندما نستخدم الطريقة الأقل أناقة بالضغط على ctrl-C لإيقاف خيط المعالجة الرئيسي (main thread)، يتم إيقاف جميع خيوط المعالجة الأخرى فوراً أيضاً، حتى لو كانت في منتصف خدمة طلب ما.
بعد ذلك، سنقوم بتنفيذ سمة (trait) Drop لاستدعاء join على كل خيط من خيوط المعالجة في pool للتأكد من إنهاء الطلبات التي تعمل عليها قبل الإغلاق. ثم سنقوم بتنفيذ طريقة لإخبار خيوط المعالجة بوجوب التوقف عن قبول طلبات جديدة والإغلاق. لرؤية هذا الكود قيد التشغيل، سنقوم بتعديل الخادم الخاص بنا ليقبل طلبين فقط قبل إغلاق thread pool الخاص به بشكل آمن.
شيء واحد يجب ملاحظته أثناء المضي قدماً: لا يؤثر أي من هذا على أجزاء الكود التي تتعامل مع تنفيذ الإغلاقات (closures)، لذا سيكون كل شيء هنا هو نفسه إذا كنا نستخدم thread pool لـ وقت تشغيل غير متزامن (async runtime).
تنفيذ سمة Drop على ThreadPool
لنبدأ بتنفيذ Drop على thread pool الخاص بنا. عندما يتم حذف (dropped) pool، يجب أن تنضم (join) جميع خيوط المعالجة الخاصة بنا للتأكد من إنهاء عملها. تعرض القائمة 21-22 محاولة أولى لتنفيذ Drop ؛ هذا الكود لن يعمل تماماً بعد.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
أولاً، نقوم بالمرور عبر كل من workers في thread pool. نستخدم &mut لهذا لأن self هو مرجع قابل للتغيير، ونحتاج أيضاً إلى القدرة على تغيير worker. لكل worker ، نقوم بطباعة رسالة تفيد بأن مثيل (instance) Worker هذا قيد الإغلاق، ثم نستدعي join على خيط ذلك instance. إذا فشل استدعاء join ، نستخدم unwrap لجعل Rust تدخل في حالة هلع (panic) وتنتقل إلى إغلاق غير آمن.
إليك الخطأ الذي نحصل عليه عند تصريف (compile) هذا الكود:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:1921:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
يخبرنا الخطأ أنه لا يمكننا استدعاء join لأننا نملك فقط استعارة قابلة للتغيير (mutable borrow) لكل worker ، بينما تأخذ join ملكية (ownership) وسيطها. لحل هذه المشكلة، نحتاج إلى نقل الخيط خارج instance Worker الذي يملك thread حتى تتمكن join من استهلاك الخيط. إحدى الطرق للقيام بذلك هي اتباع نفس النهج الذي اتبعناه في القائمة 18-15. إذا كان Worker يحتفظ بـ Option<thread::JoinHandle<()>> ، فيمكننا استدعاء دالة take على Option لنقل القيمة خارج متغير (variant) Some وترك variant None في مكانه. بعبارة أخرى، فإن Worker الذي يعمل سيكون لديه variant Some في thread ، وعندما نريد تنظيف Worker ، سنستبدل Some بـ None حتى لا يكون لدى Worker خيط لتشغيله.
ومع ذلك، فإن المرة الوحيدة التي سيظهر فيها هذا هي عند حذف Worker. في المقابل، سيتعين علينا التعامل مع Option<thread::JoinHandle<()>> في أي مكان نصل فيه إلى worker.thread. يستخدم أسلوب رست الاصطلاحي (Idiomatic Rust) نوع Option كثيراً، ولكن عندما تجد نفسك تغلف شيئاً تعرف أنه سيكون موجوداً دائماً في Option كحل بديل مثل هذا، فمن الجيد البحث عن طرق بديلة لجعل الكود الخاص بك أنظف وأقل عرضة للخطأ.
في هذه الحالة، يوجد بديل أفضل: دالة Vec::drain. وهي تقبل معلمة نطاق (range parameter) لتحديد العناصر التي سيتم إزالتها من المتجه (vector) وتُرجع مكرراً (iterator) لتلك العناصر. سيؤدي تمرير بناء جملة النطاق .. إلى إزالة كل قيمة من vector.
لذا، نحتاج إلى تحديث تنفيذ drop لـ ThreadPool كالتالي:
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
}
يحل هذا خطأ compiler ولا يتطلب أي تغييرات أخرى في الكود الخاص بنا. لاحظ أنه نظراً لأنه يمكن استدعاء drop عند حدوث panic، فإن unwrap يمكن أن تسبب أيضاً panic وتؤدي إلى هلع مزدوج (double panic)، مما يؤدي فوراً إلى تعطل البرنامج وإنهاء أي عملية تنظيف قيد التنفيذ. هذا أمر مقبول لبرنامج تجريبي، ولكنه غير مستحسن لكود الإنتاج.
إرسال إشارة إلى خيوط المعالجة للتوقف عن انتظار المهام
مع كل التغييرات التي أجريناها، يتم تصريف الكود الخاص بنا دون أي تحذيرات. ومع ذلك، فإن الخبر السيئ هو أن هذا الكود لا يعمل بالطريقة التي نريدها بعد. المفتاح هو المنطق في closures التي يتم تشغيلها بواسطة خيوط المعالجة لـ instances Worker: في الوقت الحالي، نستدعي join ، لكن ذلك لن يوقف خيوط المعالجة، لأنها في حلقة (loop) للأبد تبحث عن مهام (jobs). إذا حاولنا حذف ThreadPool مع تنفيذنا الحالي لـ drop ، فسيتم حظر (block) main thread للأبد، بانتظار انتهاء الخيط الأول.
لإصلاح هذه المشكلة، سنحتاج إلى تغيير في تنفيذ drop لـ ThreadPool ثم تغيير في loop الخاص بـ Worker.
أولاً، سنقوم بتغيير تنفيذ drop لـ ThreadPool لحذف المرسل (sender) صراحةً قبل انتظار انتهاء خيوط المعالجة. تعرض القائمة 21-23 التغييرات على ThreadPool لحذف sender صراحةً. على عكس الخيط، نحتاج هنا بالفعل إلى استخدام Option لنتمكن من نقل sender خارج ThreadPool باستخدام Option::take.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
يؤدي حذف sender إلى إغلاق القناة (channel)، مما يشير إلى أنه لن يتم إرسال المزيد من الرسائل. عندما يحدث ذلك، فإن جميع استدعاءات recv التي تقوم بها instances Worker في الحلقة اللانهائية ستُرجع خطأً. في القائمة 21-24، نقوم بتغيير loop الخاص بـ Worker للخروج من الحلقة بشكل آمن في هذه الحالة، مما يعني أن خيوط المعالجة ستنتهي عندما يستدعي تنفيذ drop لـ ThreadPool دالة join عليها.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
لرؤية هذا الكود قيد التشغيل، دعونا نعدل main لقبول طلبين فقط قبل إغلاق الخادم بشكل آمن، كما هو موضح في القائمة 21-25.
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
لن ترغب في إغلاق خادم ويب حقيقي بعد خدمة طلبين فقط. يوضح هذا الكود فقط أن Graceful Shutdown و Cleanup يعملان بشكل صحيح.
دالة take معرفة في trait Iterator وتحد من التكرار إلى أول عنصرين على الأكثر. سيخرج ThreadPool عن النطاق في نهاية main ، وسيتم تشغيل تنفيذ drop.
ابدأ الخادم باستخدام cargo run وقم بإجراء ثلاثة طلبات. يجب أن يفشل الطلب الثالث، وفي terminal الخاص بك، يجب أن ترى مخرجات مشابهة لهذا:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
قد ترى ترتيباً مختلفاً لمعرفات Worker والرسائل المطبوعة. يمكننا أن نرى كيف يعمل هذا الكود من الرسائل: حصلت instances Worker رقم 0 و 3 على أول طلبين. توقف الخادم عن قبول الاتصالات بعد الاتصال الثاني، ويبدأ تنفيذ Drop على ThreadPool قبل أن يبدأ Worker 3 مهمته. يؤدي حذف sender إلى فصل جميع instances Worker وإخبارها بالإغلاق. تطبع instances Worker رسالة عند فصلها، ثم يستدعي thread pool دالة join لانتظار انتهاء كل خيط Worker.
لاحظ جانباً مثيراً للاهتمام في هذا التنفيذ المحدد: قام ThreadPool بحذف sender ، وقبل أن يتلقى أي Worker خطأً، حاولنا الانضمام إلى Worker 0. لم يكن Worker 0 قد تلقى خطأً بعد من recv ، لذا تم حظر main thread، بانتظار انتهاء Worker 0. في هذه الأثناء، تلقى Worker 3 مهمة ثم تلقت جميع خيوط المعالجة خطأً. عندما انتهى Worker 0 ، انتظر main thread انتهاء بقية instances Worker. عند تلك النقطة، كانت جميعها قد خرجت من حلقاتها وتوقفت.
تهانينا! لقد أكملنا مشروعنا الآن؛ لدينا خادم ويب أساسي يستخدم thread pool للاستجابة بشكل غير متزامن. نحن قادرون على إجراء Graceful Shutdown للخادم، مما ينظف جميع خيوط المعالجة في pool.
إليك الكود الكامل للمرجع:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
يمكننا القيام بالمزيد هنا! إذا كنت ترغب في الاستمرار في تحسين هذا المشروع، فإليك بعض الأفكار:
- إضافة المزيد من التوثيق (documentation) لـ
ThreadPoolودوالها العامة. - إضافة اختبارات (tests) لوظائف المكتبة.
- تغيير استدعاءات
unwrapإلى معالجة أخطاء أكثر قوة. - استخدام
ThreadPoolلأداء مهام أخرى غير خدمة طلبات الويب. - العثور على حزمة (crate) لـ thread pool على crates.io وتنفيذ خادم ويب مماثل باستخدام crate بدلاً من ذلك. ثم قارن واجهة برمجة التطبيقات (API) الخاصة بها وقوتها بـ thread pool الذي قمنا بتنفيذه.
ملخص (Summary)
أحسنت! لقد وصلت إلى نهاية الكتاب! نود أن نشكرك على انضمامك إلينا في هذه الجولة في Rust. أنت الآن جاهز لتنفيذ مشاريع Rust الخاصة بك والمساعدة في مشاريع الآخرين. ضع في اعتبارك أن هناك مجتمعاً مضيافاً من الـ Rustaceans الآخرين الذين يسعدهم مساعدتك في أي تحديات تواجهها في رحلتك مع Rust.