無畏并發(fā)
并發(fā)
- Concurrent:程序的不同部分之間獨立的執(zhí)行(并發(fā))
- Parallel:程序的不同部分同時運行(并行)
- Rust無畏并發(fā):允許你編寫沒有細微Bug的代碼,并在不引入新Bug的情況下易于重構(gòu)
- 注意:本文中的”并發(fā)“泛指 concurrent 和 parallel
一、使用線程同時運行代碼(多線程)
進程與線程
- 在大部分OS里,代碼運行在進程(process)中,OS同時管理多個進程。
- 在你的程序里,各獨立部分可以同時運行,運行這些獨立部分的就是線程(thread)
- 多線程運行:
- 提升性能表現(xiàn)
- 增加復雜性:無法保障各線程的執(zhí)行順序
多線程可導致的問題
- 競爭狀態(tài),線程以不一致的順序訪問數(shù)據(jù)或資源
- 死鎖,兩個線程彼此等待對方使用完所持有的資源,線程無法繼續(xù)
- 只在某些情況下發(fā)生的 Bug,很難可靠地復制現(xiàn)象和修復
實現(xiàn)線程的方式
- 通過調(diào)用OS的API來創(chuàng)建線程:1:1模型
- 需要較小的運行時
- 語言自己實現(xiàn)的線程(綠色線程):M:N模型
- 需要更大的運行時
- Rust:需要權(quán)衡運行時的支持
- Rust標準庫僅提供1:1模型的線程
通過 spawn 創(chuàng)建新線程
- 通過 thread::spawn 函數(shù)可以創(chuàng)建新線程:
- 參數(shù):一個閉包(在新線程里運行的代碼)
? cd rust
~/rust
? cargo new thread_demo
Created binary (application) `thread_demo` package
~/rust
? cd thread_demo
thread_demo on master [?] via ?? 1.67.1
? c # code .
thread_demo on master [?] via ?? 1.67.1
?
- thread::sleep 會導致當前線程暫停執(zhí)行
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1)); // 暫停 1 毫秒
}
}
執(zhí)行
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
? cargo run
Compiling thread_demo v0.1.0 (/Users/qiaopengjun/rust/thread_demo)
Finished dev [unoptimized + debuginfo] target(s) in 0.65s
Running `target/debug/thread_demo`
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 4 from the main thread!
hi number 5 from the spawned thread!
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
?
通過 join Handle 來等待所有線程的完成
- thread::spawn 函數(shù)的返回值類型是 JoinHandle
- JoinHandle 持有值的所有權(quán)
- 調(diào)用其 join 方法,可以等待對應的其它線程的完成
- join 方法:調(diào)用 handle 的join方法會阻止當前運行線程的執(zhí)行,直到 handle 所表示的這些線程終結(jié)。
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1)); // 暫停 1 毫秒
}
handle.join().unwrap();
}
執(zhí)行
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
? cargo run
Compiling thread_demo v0.1.0 (/Users/qiaopengjun/rust/thread_demo)
Finished dev [unoptimized + debuginfo] target(s) in 0.75s
Running `target/debug/thread_demo`
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 2 from the main thread!
hi number 3 from the spawned thread!
hi number 3 from the main thread!
hi number 4 from the spawned thread!
hi number 4 from the main thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
等分線程執(zhí)行完繼續(xù)執(zhí)行主線程
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
handle.join().unwrap();
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1)); // 暫停 1 毫秒
}
}
運行文章來源:http://www.zghlxwxcb.cn/news/detail-418201.html
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
? cargo run
Compiling thread_demo v0.1.0 (/Users/qiaopengjun/rust/thread_demo)
Finished dev [unoptimized + debuginfo] target(s) in 0.28s
Running `target/debug/thread_demo`
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
使用 move 閉包
- move 閉包通常和 thread::spawn 函數(shù)一起使用,它允許你使用其它線程的數(shù)據(jù)
- 創(chuàng)建線程時,把值的所有權(quán)從一個線程轉(zhuǎn)移到另一個線程
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| { // 報錯
println!("Here's a vector: {:?}", v);
});
// drop(v);
handle.join().unwrap();
}
修改后:文章來源地址http://www.zghlxwxcb.cn/news/detail-418201.html
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
// drop(v);
handle.join().unwrap();
}
二、使用消息傳遞來跨線程傳遞數(shù)據(jù)
消息傳遞
- 一種很流行且能保證安全并發(fā)的技術(shù)就是:消息傳遞。
- 線程(或 Actor)通過彼此發(fā)送消息(數(shù)據(jù))來進行通信
- Go 語言的名言:不要用共享內(nèi)存來通信,要用通信來共享內(nèi)存。
- Rust:Channel(標準庫提供)
Channel
- Channel 包含: 發(fā)送端、接收端
- 調(diào)用發(fā)送端的方法,發(fā)送數(shù)據(jù)
- 接收端會檢查和接收到達的數(shù)據(jù)
- 如果發(fā)送端、接收端中任意一端被丟棄了,那么Channel 就”關(guān)閉“了
創(chuàng)建 Channel
- 使用
mpsc::channel
函數(shù)來創(chuàng)建 Channel- mpsc 表示 multiple producer,single consumer(多個生產(chǎn)者、一個消費者)
- 返回一個 tuple(元組):里面元素分別是發(fā)送端、接收端
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
發(fā)送端的 send 方法
- 參數(shù):想要發(fā)送的數(shù)據(jù)
- 返回:Result<T, E>
- 如果有問題(例如接收端已經(jīng)被丟棄),就返回一個錯誤
接收端的方法
- recv 方法:阻止當前線程執(zhí)行,直到 Channel 中有值被送來
- 一旦有值收到,就返回 Result<T, E>
- 當發(fā)送端關(guān)閉,就會收到一個錯誤
- try_recv 方法:不會阻塞,
- 立即返回 Result<T, E>:
- 有數(shù)據(jù)達到:返回 Ok,里面包含著數(shù)據(jù)
- 否則,返回錯誤
- 通常會使用循環(huán)調(diào)用來檢查 try_recv 的結(jié)果
- 立即返回 Result<T, E>:
Channel 和所有權(quán)轉(zhuǎn)移
- 所有權(quán)在消息傳遞中非常重要:能幫你編寫安全、并發(fā)的代碼
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val) // 報錯 借用了移動的值
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
發(fā)送多個值,看到接收者在等待
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
通過克隆創(chuàng)建多個發(fā)送者
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("1: hi"),
String::from("1: from"),
String::from("1: the"),
String::from("1: thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
三、共享狀態(tài)的并發(fā)
使用共享來實現(xiàn)并發(fā)
- Go 語言的名言:不要用共享內(nèi)存來通信,要用通信來共享內(nèi)存。
- Rust支持通過共享狀態(tài)來實現(xiàn)并發(fā)。
- Channel 類似單所有權(quán):一旦將值的所有權(quán)轉(zhuǎn)移至 Channel,就無法使用它了
- 共享內(nèi)存并發(fā)類似多所有權(quán):多個線程可以同時訪問同一塊內(nèi)存
使用 Mutex 來每次只允許一個線程來訪問數(shù)據(jù)
- Mutex 是 mutual exclusion(互斥鎖)的簡寫
- 在同一時刻,Mutex 只允許一個線程來訪問某些數(shù)據(jù)
- 想要訪問數(shù)據(jù):
- 線程必須首先獲取互斥鎖(lock)
- lock 數(shù)據(jù)結(jié)構(gòu)是 mutex 的一部分,它能跟蹤誰對數(shù)據(jù)擁有獨占訪問權(quán)
- mutex 通常被描述為:通過鎖定系統(tǒng)來保護它所持有的數(shù)據(jù)
- 線程必須首先獲取互斥鎖(lock)
Mutex 的兩條規(guī)則
- 在使用數(shù)據(jù)之前,必須嘗試獲取鎖(lock)。
- 使用完 mutex 所保護的數(shù)據(jù),必須對數(shù)據(jù)進行解鎖,以便其它線程可以獲取鎖。
Mutex<T> 的 API
- 通過 Mutex::new(數(shù)據(jù)) 來創(chuàng)建
Mutex<T>
-
Mutex<T>
是一個智能指針
-
- 訪問數(shù)據(jù)前,通過 lock 方法來獲取鎖
- 會阻塞當前線程
- lock 可能會失敗
- 返回的是 MutexGuard(智能指針,實現(xiàn)了 Deref 和 Drop)
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
多線程共享 Mutex<T>
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || { // 報錯 循環(huán) 所有權(quán)
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
多線程的多重所有權(quán)
use std::sync::Mutex;
use std::thread;
use std::rc::Rc;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || { // 報錯 rc 只能用于單線程
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
使用 Arc<T>來進行原子引用計數(shù)
-
Arc<T>
和Rc<T>
類似,它可以用于并發(fā)情景- A:atomic,原子的
- 為什么所有的基礎(chǔ)類型都不是原子的,為什么標準庫類型不默認使用
Arc<T>
?- 需要性能作為代價
-
Arc<T>
和Rc<T>
的API是相同的
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
RefCell<T>/Rc<T> vs Muter<T>/Arc<T>
-
Mutex<T>
提供了內(nèi)部可變性,和 Cell 家族一樣 - 我們使用
RefCell<T>
來改變Rc<T>
里面的內(nèi)容 - 我們使用
Mutex<T>
來改變Arc<T>
里面的內(nèi)容 - 注意:
Mutex<T>
有死鎖風險
四、通過 Send 和 Sync Trait 來擴展并發(fā)
Send 和 Sync trait
- Rust 語言的并發(fā)特性較少,目前講的并發(fā)特性都來自標準庫(而不是語言本身)
- 無需局限于標準庫的并發(fā),可以自己實現(xiàn)并發(fā)
- 但在Rust語言中有兩個并發(fā)概念:
- std::marker::Sync 和 std::marker::Send 這兩個trait
Send:允許線程間轉(zhuǎn)移所有權(quán)
- 實現(xiàn) Send trait 的類型可在線程間轉(zhuǎn)移所有權(quán)
- Rust中幾乎所有的類型都實現(xiàn)了 Send
- 但
Rc<T>
沒有實現(xiàn) Send,它只用于單線程情景
- 但
- 任何完全由Send 類型組成的類型也被標記為 Send
- 除了原始指針之外,幾乎所有的基礎(chǔ)類型都是 Send
Sync:允許從多線程訪問
- 實現(xiàn)Sync的類型可以安全的被多個線程引用
- 也就是說:如果T是Sync,那么 &T 就是 Send
- 引用可以被安全的送往另一個線程
- 基礎(chǔ)類型都是 Sync
- 完全由 Sync 類型組成的類型也是 Sync
- 但,
Rc<T>
不是 Sync 的 -
RefCell<T>
和Cell<T>
家族也不是 Sync的 - 而,
Mutex<T>
是Sync的
- 但,
手動來實現(xiàn) Send 和 Sync 是不安全的
到了這里,關(guān)于Rust編程語言入門之無畏并發(fā)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!