16无畏并发

并发编程(Concurrent programming),代表程序的不同部分相互独立的执行。

并行编程(parallel programming),代表程序不同部分同时执行。

Rust 利用所有权系统和类型检查,在编译时发现并发错误。

  • 如何创建线程来同时运行多段代码。
  • 消息传递(Message passing)并发,其中通道(channel)被用来在线程间传递消息。
  • 共享状态(Shared state)并发,其中多个线程可以访问同一片数据。
  • SyncSend trait,将 Rust 的并发保证扩展到用户定义的以及标准库提供的类型中。

使用线程同时运行代码

进程(process)

线程(threads)

将程序中的计算拆分为多个线程可以改善性能。不过同时会增加复杂性。

编程语言实现线程的方式:

  • 1:1 模型,一个OS线程对应一个语言线程;

    • 编程语言调用操作系统创建新线程的API;
  • M:N 模型 ,M个绿色线程,对应 N个OS线程(M和N不必相同)

    • 编程语言提供了自己特殊的线程实现,这些线程被称为绿色(green)线程;

    • 使用绿色线程的语言会在不同数量的OS线程的上下文中执行它们。

使用 spawn 创建新线程

thread::spawn 函数用于创建新线程,

  • 需要传递一个闭包作为参数,闭包体是希望在新线程运行的代码
1
2
3
4
5
6
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});

使用 join 等待所有线程结束

默认各个线程单独运行,主线程结束不会等待其他线程运行。

  • 可能导致的问题,主线程结束后,其他线程可能没有执行或者没有执行完提早结束。

解决:

thread::spawn 的返回值类型是 JoinHandle ,拥有所有权,调用其 join 方法,会阻塞(Blocking)当前线程,直到JoinHandle所代表的线程结束。

  • 阻塞(Blocking)线程意味着阻止该线程执行工作或退出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::thread;
use std::time::Duration;

fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});

for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}

handle.join().unwrap(); // 等待线程结束
}

JoinHandle 类型是泛型的,其参数类型是线程的返回值,

  • 如果没有返回值,那么默认就是(),所以就是JoinHandle<()>
  • 如果有返回值时,就是返回值类型,例如,JoinHandle<i32>

通常在主线程中会调用 joinhandle.join() 方法来阻塞主线程,join() 的返回值是 Result<T>,所以想获得线程返回值,那么可以使用unwrap() 或者处理 Result 类型的其他方法。

线程与 move 闭包

什么情况下需要使用 move关键字?

线程中的闭包直接使用外部环境值,分为两种情况:

  1. 获取其所有权;

    • 没有问题,所有权转移到线程中的闭包里,可以直接使用。
  2. 借用;

    • 有问题,不能直接用;需要添加 move关键字;
    • 线程中的闭包不能直接借用环境值,为什么?
      • 因为在这种情况下, Rust 无法判断新线程会执行多久,所以无法知晓外部的引用是否一直有效。所以会出现如下报错:may outlive borrowed value v
    • 如何正确使用?
      • 当线程中的闭包需要借用外部环境值时(即使用引用),在闭包前添加 move 关键字,强制将其所有权转移到闭包内。(即转为第1种)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 第一种情况,闭包内获取环境值所有权,
// ok,直接用
thread::spawn(|| {
handle_connection(stream);
});
fn handle_connection(mut stream: TcpStream) {}

// 第二种情况,闭包内借用环境值,
// wrong,不能直接用
let v = vec![1, 2, 3];
// 下面代码会报错的
let handle = thread::spawn(|| {
// println! 是仅借用vec
println!("Here's a vector: {:?}", v);
});

// 第二种情况的,正确方式,
// 添加 move
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});

move 闭包,经常与 thread::spawn 一起使用

  • move 关键字,强制闭包获取其使用的环境值的所有权;

  • 最佳使用方式:在创建新线程,将值得所有权从一个线程移到另一个线程;

使用消息传递在线程间传递数据

消息传递(message passing),线程或actor 通过发送包含数据的消息来相互沟通。

通道(channel),是 Rust 中实现消息传递并发的主要工具。

  • 由两部分组成:
    • 发送者(transmitter),位于上游位置
    • 接受者(receiver),位于下游位置
  • 代码中的一部分调用发送者的方法以及希望发送的数据,另一部分则检查接收端收到的消息。
  • 当发送者或接受者任一被丢弃时,可以认为通道被关闭(closed)了。
1
2
3
4
5
use std::sync::mpsc;

fn main() {
let (tx, rx) = mpsc::channel();
}

mpsc多个生产者,单个消费者(multiple producer,single consumer)的缩写。

  • mpsc::channel 函数可以创建一个新的通道。

    • 返回一个元祖,第一个元素是发送端,第二个元素是接收端。
    • 通常,tx 为发送者(transmitter),rx 为接受者(receiver)的缩写。(有历史原因)
  • Rust 标准库实现通道的方式,意味着一个通道可以有多个生产者,但只能有一个消费者。

    • 类似于多条小何最终汇聚成大河。

发送端的方法:

  • send 将数据放入通道,
    • 返回一个Result<T,E> 类型,如果没有接受者,发送操作会返回错误。

接收端的方法:

  • recv,(receive 的缩写)
    • 会阻塞主线程执行,知道从通道中接收一个值;
    • 返回 Result<T,E>,如果通道发送端关闭,则返回一个错误表明不会有新值过来;
  • try_recv
    • 不会阻塞,会立刻返回一个 Result<T,E>
      • Ok 值包含可用的信息,
      • Err 值代表此时没有任何信息。
    • 如果线程在等待消息过程中还有其他工作时,使用 try_recv很有用,可以编写一个循环来频繁调用try_recv,在有可用消息时进行处理,其他时候则处理一会儿其他工作知道再次检查。

mspc::channel 返回的接受者可以被作为迭代器使用在 for 循环中。

1
2
3
4
5
6
   let (tx, rx) = mpsc::channel();
//--snip--

for received in rx {
println!("Got: {}", received);
}

通过克隆发送者来创建多个生产者

对通道的发送端调用 clone 方法,可以实现克隆通道的发送端来实现多个发送端。

1
2
3
let (tx, rx) = mpsc::channel();

let tx1 = mpsc::Sender::clone(&tx);

通道和所有权转移

send 函数获取其参数的所有权并移动这个值归接受者所有。

  • 这可以防止在发送后再次意外地使用这个值。

共享状态并发

在某种程度上,任何编程语言中的通道都类似于单所有权,即一旦将一个值传送到通道中,将无法再使用这个值。

共享内存类似于多所有权:多个线程可以同时访问相同的内存位置。

互斥器一次只允许一个线程访问数据

互斥器(mutex)是 mutual exclusion 的缩写,

  • 即任意时刻,其只允许一个线程访问某些数据。

如何访问互斥器中的数据?

  1. 线程通过获取互斥器的(lock)来表明其希望访问数据;

是一个作为互斥器一部分的数据结构,

  • 记录谁有数据的排他访问权;

Mutex<T> 的 API

Mutex::new 创建 Mutex<T>,即互斥器

lock 方法获取锁,以访问互斥器中的数据;

  • 这个方法调用会阻塞当前线程,知道线程拥有锁为止。

  • 如果拥有锁的线程 panic了,那么其他线程调用 lock 会失败。且没有线程能再获取锁。

  • lock 方法返回一个 MutexGuard 类型的智能指针,所以一旦获取锁,就可以将返回值视为一个引用了。

    • MutexGuard 实现了 Deref 来指向其内部数据,Drop 实现了当 MutexGuard 离开作用域时自动释放锁

      锁自动释放,这个功能使得我们不会忘记释放锁,导致其他线程无法使用互斥器。

获取锁之后,因为返回值是一个智能指针,所以要使用内部数据,需要使用 *解引用运算符

1
2
3
4
5
6
7
8
9
10
11
12
use std::sync::Mutex;

fn main() {
let m = Mutex::new(5); // 返回类型为 Mutex<i32>

{
let mut num = m.lock().unwrap(); // num 的类型为 MutexGuard<i32> 的智能指针
*num = 6; // * 解引用
}

println!("m = {:?}", m);
}

在线程间共享 Mutex<T>

多线程和多所有权

Arc<T> 原子引用计数(atomically reference counted)

  • 类似 Rc<T> ,可以安全的用在并发环境的类型。A 是原子性(atomic)
  • 原子性乐行工作起来类似原始类型,不过可以安全的在线程间共享。
  • 具体查看std::sync::atomic的文档

为什么不是所有原始类型都是原子性的?或所有类型都使用 Arc<T>?

  • 因为线程安全带有性能惩罚(即消耗多一些),所以只在必要时才做。单线程没有必要

RefCell<T>/Rc<T>Mutex<T>/Arc<T> 的相似性

单线程中,

RefCell<T>/Rc<T> 组合,可以提供多所有权且可以修改内部可变性,

并发环境中,

Mutex<T>/Arc<T>组合也有相同功效,Mutex<T> 提供了内部可变性的功能。

问题:

RefCell<T>/Rc<T> 组合,可能造成引用循环的风险,

Mutex<T> 也有造成死锁(deadlock)的风险,

  • 当一个操作需要锁住两个资源,而两个线程各持一个锁,这会造成它们永远相互等待。
  • 查看标准库的Mutex<T>MutexGuard 的 API 文档获取规避死锁的策略。

使用 SyncSend trait 的可扩展并发

Rust 语言本身不提供并发相关的基础设施。这章节所提到的所有内容都属于标准库。

std::marker 中的 SyncSend trait 是内嵌于语言中的两个并发概念。

通过 Send 允许在线程间转移所有权

Send trait 表明类型的所有权可以在线程间传递

几乎所有的 Rust 类型都是 Send 的,

  • 目前学到的Rc<T>不是,这个不能Send,因为如果克隆了Rc<T> 的值并尝试将克隆的所有权转移到另一个线程,这两个线程可能同时更新引用计数。

  • 裸指针(raw pointer) 除外,裸指针也不是Send的。

任何完全由 Send 的类型组成的类型也会自动被标记为 Send

Sync 允许多线程访问

Sync trait 表明一个实现了Sync 的类型可以安全的在多个线程中拥有其值的引用(传递引用)。

  • 对于任意类型T,如果&TT的引用)是Send的话,T就是 Sync的,这意味着其引用就可以安全的发送到另一个线程。
  • 类似于 Send,基本类型都是Sync的,
  • 完全由Sync的类型组成的类型也是Sync的,
  • Mutex<T>Sync

目前已学不是Sync的类型:

  • Rc<T> 也不是 Sync 的,理由与不是 Send相同。

  • RefCell<T>Cell<T> 系列类型都不是 Sync的。

    • RefCell<T> 在运行时所进行的借用检查也不是线程安全的。

手动实现SendSync 是不安全的

通常不需要手动实现SendSync trait,因为由SendSync 的类型组成的类型,自动就是 SendSync的。

它们都是标记 trait,不需要实现任何方法,只是用来加强并发相关的不可变性的。

总结

因为 Rust 本身很少有处理并发的部分内容,有很多的并发方案都由 crate 实现。他们比标准库要发展的更快;请在网上搜索当前最新的用于多线程场景的 crate。

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2022-2023 ligongzhao
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信