文章目录

关注Rust语言已经有段时间了,距离Rust 1.0发布也只剩一个月不到的时间了,所以就写篇Rust并发相关的文章吧。

Rust官方博客在几天前发布的一篇名为Fearless Concurrency with Rust的文章,从这篇文章可以看出Rust在并发的内存安全上做了很多努力,Rust通过在编译期对所有权(ownership)生命期(lifetime)等进行静态检查以确保并发安全。本文主要是记录博主对于这篇文章的一些见解。

使用Rust来写并发程序,可以受益于下面这些好处。

  • 使用channel传递消息时,被传递的数据的所有权会随着消息的发送而转移,所以你可以将一个指针从一个线程发送到另一个线程而无须担心这些线程之后在对这个指针进行访问的时候会发生竞争。Rust的channel强制确保线程隔离。
    在Rust中可以通过channel将一个数据发送到另一个线程,并且被传递的数据的所有权也会随之转移,下面的例子演示了通过channel将data从主线程发送到另一个线程。由于所有权也会跟着转移,所以主线程的data也就不再有效了。当尝试再使用data的时候就会被编译器视为错误。

    通过channel传递数据时,并非真的是在线程间转移所有权。在Rust中向一个函数按值传递参数或函数返回一个值就意味着所有权转移。因此将要传递的数据作为参数传递给send方法时就失去了所有权,当从recv的返回值获取到这个数据时就获得了所有权。如果send和recv发生在两个不同线程,那么看起来就像是所有权在这两个线程间转移。此外例子中receiver的所有权也发生了转移,只是它并不是使用channel传递的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    use std::sync::mpsc;
    use std::thread;

    fn use_data(data: Box<i32>) {
    println!("{}", data);
    }

    fn main() {
    let data = Box::new(0);
    let (sender, receiver) = mpsc::channel();
    let guard = thread::scoped(move || {
    let data = receiver.recv().unwrap();
    use_data(data);
    });
    sender.send(data).unwrap();
    use_data(data); // error: use of moved value: `data`
    guard.join();
    }
  • 一个知道它所保护的数据,Rust保证这些数据只有在持有锁的状态下才可以被访问。状态(state)永远不会被意外的共享。Rust强制确保“锁住的是数据,而非代码”。
    有时候可能需要在多个线程共享某些状态,以便这些线程更好的协同完成某些工作。在多个线程共享状态就需要在访问这些状态数据时进行同步,最常用的就是对要访问的数据使用锁。使用锁如果不小心就可能在访问前忘记获取锁,或者在访问完后忘记释放锁,后者大部分的编程语言都已有很好的解决方案。

抛开锁的问题先不谈,由于Rust中所有权的关系,线程是相互隔离的。通常来说如果要在多个线程中共享状态数据需要频繁的将这些数据在这些线程间转移,显然这是很不方便的。我们需要的仅仅是这些状态数据的临时使用权,因此可以在所有线程都不使用这些数据的时候先让第三方持有这些状态数据,当某个线程要使用这些状态数据的时候借出来用下,使用完后再还回去。Rust标准库提供了智能指针Arc非常适合来作为这个第三方(Arc类似于C++的shared_ptr)。下面的代码演示了使用Arc在5个并发线程中共享state。

借(Borrowing)是Rust的另一个概念,前面提到在Rust中向一个函数按值传递参数或函数返回一个值就意味着所有权转移,但是如果需要的仅仅是临时的访问权,可以通过使用引用来借。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use std::sync::Arc;
use std::thread;

fn main() {
let state = 10;
let pointer = Arc::new(state);
let _: Vec<_> = (0..5).map(|i| {
let pointer_cloned = pointer.clone();
thread::scoped(move || {
let state = *pointer_cloned;
println!("thread {}, state = {}", i, state);
})
}).collect();
}

上面的例子中5个线程在借到state后都只是执行了读操作,有时我们还需要在某个线程中更新这些状态。在Rust中借(borrow)分为可修改的借(mutable borrow, &mut T)和不可修改的借(immutable borrow, &T),为了确保借不会发生数据竞争,Rust规定可修改的借不允许与其他借同时发生,但在编译期是难以检查出在多线程环境下是否会发生这种情况,因而Arc只允许不可修改的借。幸运的是锁在运行时的同步具有与其相似的排它性(互斥),在Rust中可以用锁来保护这些数据以实现可修改的借。

Rust强制确保了共享的数据只有在持有锁的状态下才可访问,这样就很好的解决了访问前忘记获取锁的问题,与C++类似使用RAII来确保不会忘记释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
let lock = Arc::new(Mutex::new(0));
let _: Vec<_> = (0..10).map(|_| {
let lock2 = lock.clone();
thread::scoped(move || {
for _ in 0..100 {
let mut data = lock2.lock().unwrap();
for _ in 0..100 {
*data += 1;
}
}
})
}).collect();
let data = lock.lock().unwrap();
assert!(*data == 100000);
}
  • 每一种数据类型都知道它是否可以安全的在多个线程之间传递和访问,Rust强制确保这些数据类型被正确的使用。即使是无锁(lock-free)的数据结构也不会发生数据竞争(data race)。线程安全不仅仅是文档,更是准则。
    线程安全的数据结构会使用一些内部同步机制来保证它可以安全的被多线程并发使用。

Rust有两种引用计数的智能指针Rc和Arc。

  • Rc: 使用普通的读/写来修改引用计数,非线程安全;
  • Arc: 使用原子操作来修改引用计数,线程安全。

在Rust中数据分为两大类Send和!Send,具有Send特性的数据结构可以安全的跨越线程边界传递,而!Send则不行。Rc属于!Send,Arc属于Send。

在Rust中尝试将一个!Send数据传递到另一个线程会被视为错误。这样可以很好的将非线程安全的数据限制在一个线程中,规避了被多个线程并发访问的安全问题。

1
2
3
4
5
6
7
8
9
use std::rc::Rc;
use std::thread;

fn main() {
let rc = Rc::new(0);
let _ = thread::scoped(move || {
println!("{}", *rc); // note: `alloc::rc::Rc<i32>` cannot be sent between threads safely
});
}
  • 你甚至可以在线程之间共享栈帧(stack frames),Rust会静态(没有运行时开销)的确保这些帧在被其他线程使用时是有效的。无论以何种形式共享,Rust都会确保其安全。
    在Rust中可以使用函数thread::scoped创建一个线程,这个函数会返回JoinGuard,JoinGuard会确保被销毁的时候线程也执行完毕(隐式调用join等待线程执行完毕)。利用这一点可以很容易的确保共享的栈帧被线程使用的时候是有效的。
1
2
3
4
5
6
7
8
9
10
11
12
use std::thread;

fn main() {
let data = 0;
{
let join_guard = thread::scoped(|| {
println!("{}", &data);
});
// join_guard destroyed here
}
// data destroyed here
}

乍一看可能会觉得“这不就是RAII嘛,C++的future就是这么做的”。是RAII没错,但Rust还会进行生命期静态检查。
比如下面的代码,data会在join_guard之前销毁,如果允许这样的代码编译通过,就可能会发生线程对data的访问发生在data销毁之后。Rust的生命期检查能在编译的时候检查到并报告这个错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
use std::thread;

fn main() {
let join_guard;
{
let data = 0;
join_guard = thread::scoped(|| {
println!("{}", &data); // error: `data` does not live long enough
});
// data destroyed here
}
// join_guard destroyed here
}
文章目录