示例1
一個(gè)不完整的示例:
let (tx, rx) = channel::unbounded::<Task>();
let mut handlers = vec![];
for _ in 0..number {
let rx = rx.clone();
let handle = thread::spawn(move || {
while let Some(task) = rx.recv() {
task.call_box();
}
});
handlers.push(handle);
}
該例子中,rx 可以被多個(gè)線程使用,是線程安全的。這就是所謂的 MPMC 模式。設(shè)想 channel 中有 10 個(gè)數(shù)據(jù),MPMC 模式允許10個(gè)線程同時(shí)利用 rx 從 channel 中讀取數(shù)據(jù)。
Rust 自帶的 channel 是 MPSC 模式的,一次僅允許一個(gè)線程從 channel 讀取數(shù)據(jù)。顯然 crossbeam 效率更高。
示例2
use crossbeam::channel;
use crossbeam::thread;
use std::thread::sleep;
use std::time::Duration;
// 定義Task結(jié)構(gòu)體
struct Task {
data: usize, // 假設(shè)任務(wù)包含一個(gè)數(shù)據(jù)字段
call_box: Box<dyn FnMut()>, // 假設(shè)任務(wù)包含一個(gè)可調(diào)用對象的裝箱指針
}
impl Task {
fn new(data: usize, call_box: impl FnMut() + 'static) -> Self {
Task {
data,
call_box: Box::new(call_box),
}
}
// 實(shí)現(xiàn)call_box方法
fn call_box(&mut self) {
(self.call_box)();
}
}
fn main() {
const NUMBER_OF_WORKERS: usize = 4; // 假設(shè)有4個(gè)工作線程
let (tx, rx) = channel::unbounded::<Task>();
let mut handlers = vec![];
// 啟動(dòng)工作線程
for _ in 0..NUMBER_OF_WORKERS {
let rx = rx.clone();
let handle = thread::spawn(move || {
while let Some(task) = rx.recv() {
task.call_box(); // 執(zhí)行任務(wù)
}
});
handlers.push(handle);
}
// 發(fā)送任務(wù)到通道
for i in 0..10 { // 假設(shè)發(fā)送10個(gè)任務(wù)
let task = Task::new(i, || {
println!("Executing task with data: {}", i);
sleep(Duration::from_secs(1)); // 模擬耗時(shí)操作
println!("Finished task with data: {}", i);
});
tx.send(task).unwrap();
}
// 關(guān)閉發(fā)送通道
drop(tx);
// 等待所有工作線程完成
for handle in handlers {
handle.join().unwrap();
}
println!("All tasks are processed.");
}
在這個(gè)程序中,我們定義了一個(gè)Task
結(jié)構(gòu)體,它包含一個(gè)data
字段和一個(gè)call_box
字段,后者是一個(gè)裝箱的可調(diào)用對象。我們實(shí)現(xiàn)了call_box
方法,它調(diào)用這個(gè)裝箱的可調(diào)用對象。
在main
函數(shù)中,我們創(chuàng)建了一個(gè)無界通道,用于在工作線程和主線程之間傳遞Task
實(shí)例。我們啟動(dòng)了NUMBER_OF_WORKERS
個(gè)工作線程,它們不斷地從通道接收Task
實(shí)例并調(diào)用call_box
方法執(zhí)行它們。
然后,主線程創(chuàng)建了一些Task
實(shí)例,并通過通道發(fā)送它們給工作線程。一旦所有任務(wù)都被發(fā)送,主線程通過drop(tx)
關(guān)閉了發(fā)送通道,這樣工作線程在嘗試接收任務(wù)時(shí),如果沒有更多任務(wù)可用,將會(huì)得到一個(gè)None
,從而退出循環(huán)。
最后,主線程等待所有工作線程完成,并打印出消息表示所有任務(wù)都已經(jīng)處理完畢。文章來源:http://www.zghlxwxcb.cn/news/detail-840029.html
請注意,為了簡化示例,我使用了Box<dyn FnMut()>
來允許Task
存儲(chǔ)任何可調(diào)用對象的裝箱指針。這意味著任務(wù)中的可調(diào)用對象必須能夠單獨(dú)編譯成一個(gè)獨(dú)立的、無狀態(tài)的函數(shù),這樣才能安全地在多個(gè)線程之間共享。在實(shí)際應(yīng)用中,你可能需要根據(jù)你的具體需求調(diào)整Task
結(jié)構(gòu)體的設(shè)計(jì)和使用方式。文章來源地址http://www.zghlxwxcb.cn/news/detail-840029.html
到了這里,關(guān)于Rust 并行庫 crossbeam 的 Channel 示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!