线程¶
1 spawn¶
https://www.rustwiki.org.cn/zh-CN/std/thread/fn.spawn.html
- 功能:产生一个新线程,并为其返回一个 JoinHandle
。
声明:
std::thread
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
连接句柄提供了一个 join
方法,可以用来连接新生成的线程。如果生成的线程发生 panics,那么 join
将返回一个 Err
,其中包含了提供给 panic!
的参数。
如果连接句柄被丢弃了,则新生成的线程将被隐式地 分离。 在这种情况下,新生成的线程可能不再被连接。 (程序有责任最终连接它创建的线程,或者将它们分离; 否则,将导致资源泄漏。)
此调用将使用 Builder
的默认参数创建一个线程,如果要指定栈大小或线程名称,请使用此 API。
正如您在 spawn
的签名中看到的那样,对于赋予 spawn
的闭包及其返回值都有两个约束,让我们对其进行解释:
'static
约束意味着闭包及其返回值必须具有整个程序执行的生命周期。这是因为线程可以比它们被创建时的生命周期更长。 确实,如果线程以及它的返回值可以比它们的调用者活得更久,我们需要确保它们以后仍然有效,并且因为我们不能知道它什么时候返回,因此需要使它们直到程序结束时尽可能有效,因此是'static
生命周期。Send
约束是因为闭包需要通过值从产生它的线程传递到新线程。它的返回值将需要从新线程传递到它被join
的线程。 提醒一下,Send
标记 trait 表示从一个线程传递到另一个线程是安全的。Sync
表示将引用从一个线程传递到另一个线程是安全的。 示例:
use std::thread;
use std::time::Duration;
fn main() {
let handler = thread::spawn(|| {
for i in 1..=10 {
println!("times [{i}] from spawned thread!");
thread::sleep(Duration::from_millis(10));
}
});
for i in 1..=10 {
println!("times [{i}] from main thread!");
thread::sleep(Duration::from_millis(10));
}
handler.join().unwrap();
}
由于线程是并发执行的,线程间执行无序,因此每次输出都可能无序,可能的输出:
times [1] from main thread!
times [1] from spawned thread!
times [2] from main thread!
times [2] from spawned thread!
times [3] from main thread!
times [3] from spawned thread!
times [4] from main thread!
times [4] from spawned thread!
times [5] from main thread!
times [5] from spawned thread!
times [6] from main thread!
times [6] from spawned thread!
times [7] from main thread!
times [8] from main thread!
times [7] from spawned thread!
times [8] from spawned thread!
times [9] from main thread!
times [9] from spawned thread!
times [10] from main thread!
1.1 channel 和 move 线程间通讯¶
线程通常是使用 channels
进行通信的,通常情况下是这样的。
此示例还展示了如何使用 move
,以便将值的所有权授予线程。
use std::thread;
use std::sync::mpsc::channel;
let (tx, rx) = channel();
let sender = thread::spawn(move || {
tx.send("Hello, thread".to_owned())
.expect("Unable to send on channel");
});
let receiver = thread::spawn(move || {
let value = rx.recv().expect("Unable to receive from channel");
println!("{value}");
});
sender.join().expect("The sender thread has panicked");
receiver.join().expect("The receiver thread has panicked");
一个线程也可以通过 JoinHandle
来返回一个值,您可以使用它进行异步计算 (不过 futures 可能更合适)。
use std::thread;
let computation = thread::spawn(|| {
// 一些昂贵的计算。
42
});
let result = computation.join().unwrap();
println!("{result}");
2 channel¶
https://www.rustwiki.org.cn/zh-CN/std/sync/mpsc/index.html 功能:多生产者,单消费者 FIFO 队列通信原语。
该模块通过通道提供基于消息的通信,具体定义为以下三种类型:
Sender
或 SyncSender
用于将数据发送到 Receiver
。 两个发送者都是可克隆的 (multi-producer),因此许多线程可以同时发送到一个接收者 (single-consumer)。
这些通道有两种口味:
- 异步,无限缓冲的通道。
channel
函数将返回(Sender, Receiver)
元组,其中所有发送都是异步的 (它们从不阻塞)。 通道在概念上具有无限的缓冲区。 - 同步的有界通道。
sync_channel
函数将返回(SyncSender, Receiver)
元组,其中待处理消息的存储是固定大小的预分配缓冲区。 所有的发送都将被阻塞,直到有可用的缓冲区空间。 请注意,允许的界限为 0,从而使通道成为 “rendezvous” 通道,每个发送者在该通道上原子地将消息传递给接收者。
2.1 Disconnection¶
通道上的发送和接收操作都将返回 Result
,指示该操作是否成功。 不成功的操作通常通过将其丢弃在相应线程中来指示具有 “hung up” 的通道的另一半。
释放通道的一半后,大多数操作将不再继续进行,因此将返回 Err
。 许多应用程序将继续对该模块返回的结果进行 unwrap
处理,如果一个线程意外死亡,则会导致线程之间传播失败。
2.2 Examples¶
简单用法:
use std::thread;
use std::sync::mpsc::channel;
// 创建一个简单的流通道
let (tx, rx) = channel();
thread::spawn(move|| {
tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);
共享用法:
use std::thread;
use std::sync::mpsc::channel;
// 创建一个可以从许多线程一起发送的共享通道,其中 tx 是发送一半 (用于传输的 tx),rx 是接收一半 (用于接收的 rx)。
//
//
let (tx, rx) = channel();
for i in 0..10 {
let tx = tx.clone();
thread::spawn(move|| {
tx.send(i).unwrap();
});
}
for _ in 0..10 {
let j = rx.recv().unwrap();
assert!(0 <= j && j < 10);
}
传播 panics:
use std::sync::mpsc::channel;
// 调用 recv() 将返回错误,因为通道已挂起 (或已释放)
//
let (tx, rx) = channel::<i32>();
drop(tx);
assert!(rx.recv().is_err());
同步通道:
use std::thread;
use std::sync::mpsc::sync_channel;
let (tx, rx) = sync_channel::<i32>(0);
thread::spawn(move|| {
// 这将等待父线程开始接收
tx.send(53).unwrap();
});
rx.recv().unwrap();
无限接收循环:
use std::sync::mpsc::sync_channel;
use std::thread;
let (tx, rx) = sync_channel(3);
for _ in 0..3 {
// 这里没有线程和克隆也是一样的,因为仍然会剩下一个 `tx`。
//
let tx = tx.clone();
// 克隆的 tx 丢弃在线程中
thread::spawn(move || tx.send("ok").unwrap());
}
// 删除最后一个发送者停止 `rx` 等待消息。
// 如果我们将其注释掉,程序将无法完成。
// 所有需要为 `rx` 排除 `tx` 才能拥有 `Err`。
drop(tx);
// 无限接收者等待所有发送者完成。
while let Ok(msg) = rx.recv() {
println!("{msg}");
}
println!("completed");
3 Barrier¶
https://www.rustwiki.org.cn/zh-CN/std/sync/struct.Barrier.html - 功能:屏障使多个线程能够同步某些计算的开始。
3.1 方法实现¶
pub fn new(n: usize) -> Barrier
创建一个新的屏障,该屏障可以阻止给定数量的线程。
屏障将阻塞调用 wait()
的 n - 1
个线程,然后在第 n 个线程调用 wait()
时立即唤醒所有线程。
use std::sync::Barrier;
let barrier = Barrier::new(10);
pub fn wait(&self) -> BarrierWaitResult)
阻塞当前线程,直到所有线程都在此处集合为止。 所有线程集合一次后,屏障可以重新使用,并且可以连续使用。
从该函数返回时,单个 (arbitrary) 线程将接收从 BarrierWaitResult::is_leader()
返回 true
的 BarrierWaitResult
,而所有其他线程将接收从 BarrierWaitResult::is_leader()
返回 false
的结果。
use std::sync::{Arc, Barrier};
use std::thread;
let mut handles = Vec::with_capacity(10);
let barrier = Arc::new(Barrier::new(10));
for _ in 0..10 {
let c = Arc::clone(&barrier);
// 相同的消息将一起打印。
// 您将看不到任何交错。
handles.push(thread::spawn(move|| {
println!("before wait");
c.wait();
println!("after wait");
}));
}
// 等待其他线程完成。
for handle in handles {
handle.join().unwrap();
}
输出:
before wait
before wait
before wait
before wait
before wait
before wait
before wait
before wait
before wait
before wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
after wait
3.2 Arc¶
https://www.rustwiki.org.cn/zh-CN/std/sync/struct.Arc.html
- 功能:线程安全的引用计数指针。Arc
代表原子引用计数。