Skip to content

Rust 并发揭秘:掌握并行执行

Rust 并发提供了一种处理并行任务的独特方法,在不牺牲性能的情况下确保了安全性。在本文中,我们将探讨其核心概念、工具和最佳实践,为开发人员提供使用 Rust 进行高效并发编程的清晰路线图。

关键见解

Rust 的所有权系统以独特的方式缓解了竞争条件等并发问题,确保了更安全的并行编程。

利用Rust 中的互斥锁可以有效地管理跨多个线程的数据安全,防止竞争条件。

Rust 中的通道是实现安全高效的线程到线程通信的强大机制。

Rust 中的thread ::spawn函数简化了新线程的创建,增强了并发任务管理。

Rust 并发提供了一种管理并行任务的独特方法,可确保安全性和效率。在您探索编程世界时,理解这个概念可以提升您的编码技能。让我们一起探索 Rust 并发功能的复杂性。

Rust 并发基础知识

Rust 中的并发性是一项核心功能,允许多个任务在重叠时间段内运行。它旨在确保程序高效运行并充分利用系统资源。Rust 通过提供一组既安全又高效的工具和抽象来实现这一点。

然而,Rust 的所有权系统确保在编译时捕获此类问题,从而使并发编程不再那么困难且更加安全。

为什么要并发?

当今世界,大多数计算机都有多个核心。为了充分利用这些核心并提高应用程序的性能,并发性至关重要。通过允许任务并发运行,应用程序可以响应更快、响应速度更快。

例如,考虑一个 Web 服务器。它需要同时处理多个请求。如果没有并发,它会逐个处理每个请求,这是低效的。有了并发,它可以同时处理多个请求,从而提高资源利用率并提高性能。

Rust 中的线程

线程是可以独立管理的最小编程指令序列。Rust 提供了一种轻松创建线程的方法。以下是一个简单的示例:

Rust
use std::thread;

fn main() {
    thread::spawn(|| {
        // some task
    });
}

在上面的代码中,我们生成一个新线程来运行闭包内的代码。这是在 Rust 应用程序中引入并发的基本方法。

互斥锁确保数据安全

当多个线程访问共享数据时,可能会出现竞争条件。互斥(互斥的缩写)是一种并发原语,可防止多个线程同时访问共享数据。

以下是在 Rust 中使用 Mutex 的基本示例:

Rust
use std::sync::Mutex;

let m = Mutex::new(5);

{
    let mut num = m.lock().unwrap();
    *num = 6;
}

println!("m = {:?}", m);

在此示例中,Mutex 包装了一个整数。lock 方法返回一个守卫,我们可以对其进行操作。一旦守卫超出范围,数据就会被解锁,从而允许其他线程访问它。

沟通通道

通道是线程之间相互通信的一种方式。它们允许一个线程向另一个线程发送数据,从而确保安全的并发访问。

这是一个简单的例子

Rust
use std::sync::mpsc;
use std::thread;

let (tx, rx) = mpsc::channel();

thread::spawn(move || {
    tx.send("Hello from the spawned thread!").unwrap();
});

let received = rx.recv().unwrap();
println!("Message: {}", received);

上述代码中,mpsc::channel 创建了一个新的通道,tx 是发送端,rx 是接收端,生成的线程发送消息,主线程接收并打印。

线程:创建和管理

在 Rust 领域,线程在实现并发方面起着关键作用。它们允许多个操作同时执行,充分利用现代多核处理器的潜力。

线程的本质

从本质上讲,线程是 CPU 执行的最小单位。它是一系列可以独立于主程序运行的指令。在 Rust 中,线程是标准库的一部分,因此可以直接将它们合并到您的应用程序中。

https://marketsplash.com/rust-threads/

创建线程

Rust 提供了一种无缝创建新线程的方法。该thread::spawn函数允许您创建一个新线程并在其中执行闭包。这是一个基本示例:

Rust
use std::thread;

fn main() {
    thread::spawn(|| {
        println!("Hello from a new thread!");
    });
}

☝️在此代码片段中,生成了一个新线程,并打印了一条消息。主线程将继续执行,而无需等待生成的线程完成。

线程连接句柄

当你生成一个线程时,Rust 会返回一个JoinHandle。此句柄非常重要,因为它提供了一种等待线程完成执行的机制。以下是它的使用方法:

Rust
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        println!("Hello from the spawned thread!");
    });

    handle.join().unwrap();
}

通过调用join()句柄,主线程将等待生成的线程完成后再继续。

线程参数

通常,您需要将参数传递给线程。Rust 使这个过程变得直观:

Rust
use std::thread;

fn main() {
    let value = 10;

    thread::spawn(move || {
        println!("Value in thread: {}", value);
    }).join().unwrap();
}

注意这个move关键字。它确保将值移动到线程的闭包中,从而允许在线程内使用它。

管理线程输出

线程也可以返回值。要检索线程的输出,可以使用 JoinHandle:

Rust
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        42
    });

    let result = handle.join().unwrap();
    println!("Thread returned: {}", result);
}

☝️在这个例子中,生成的线程返回整数 42,然后由主线程打印该整数。

线程限制

必须了解的是,线程不是轻量级的。每个线程都会消耗系统资源,例如内存。因此,生成大量线程可能会导致系统资源耗尽。

线程池

为了缓解线程的限制,Rust 提供了线程池。线程池是一组预先生成的、准备执行任务的线程。这种方法比为每个任务生成一个新线程更有效率,尤其是对于短期任务。

以下是使用线程池的基本示例:

Rust
use std::thread;
use std::sync::mpsc;

let (tx, rx) = mpsc::channel();
let pool = threadpool::Builder::new().num_threads(4).build();

for _ in 0..8 {
    let tx = tx.clone();
    pool.execute(move || {
        tx.send("Task completed").unwrap();
    });
}

for _ in 0..8 {
    println!("{}", rx.recv().unwrap());
}

☝️在此代码片段中,创建了一个包含四个线程的线程池。然后将八个任务分派到池中,主线程等待接收完成消息。

互斥:同步和数据保护

并发性带来了强大的功能,但随之而来的是数据安全的挑战。Mutex是互斥的缩写,是 Rust 中的同步原语,可确保一次只有一个线程可以访问数据,从而防止数据争用。

什么是互斥锁?

互斥锁是一种用于同步资源访问的锁定机制。每次只有一个线程可以持有互斥锁。如果另一个线程尝试锁定同一个互斥锁,它将被阻塞,直到第一个线程释放锁。

为什么要使用互斥锁?

在并发编程中,多个线程可能需要访问共享数据。如果没有适当的同步,这可能会导致不可预测的结果。互斥锁提供了一种保护共享数据免受并发访问的方法,从而确保数据完整性。

在 Rust 中使用 Mutex

Rust 的标准库提供了一个 Mutex 类型,可用于保护共享数据。这是一个基本示例:

Rust
use std::sync::Mutex;

let m = Mutex::new(5);

{
    let mut num = m.lock().unwrap();
    *num = 6;
}

println!("m = {:?}", m);

☝️这段代码中,创建了一个 Mutex 来保护一个整数。调用 lock 方法获取 Mutex 上的锁。一旦访问和修改了数据,锁就会在块结束时自动释放。

常见的互斥模式

  1. 线程间共享互斥锁

要在线程之间共享互斥锁,可以将其包装在Arc(原子引用计数)中。这允许多个线程引用互斥锁:

Rust
use std::sync::{Arc, Mutex};
use std::thread;

let counter = Arc::new(Mutex::new(0));

let mut handles = vec![];

for _ in 0..10 {
    let counter = Arc::clone(&counter);
    let handle = thread::spawn(move || {
        let mut num = counter.lock().unwrap();
        *num += 1;
    });
    handles.push(handle);
}

for handle in handles {
    handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());

☝️此代码生成十个线程,每个线程增加一个共享计数器。互斥锁确保每次只有一个线程可以增加计数器。

死锁

互斥锁的一个常见缺陷是可能出现死锁。当两个或多个线程相互等待释放锁时,就会发生这种情况,从而导致停滞。设计代码以避免此类情况至关重要。

互斥锁与其他同步原语

Rust 提供其他同步原语,如RwLockCondvar。Mutex 允许对一个线程进行独占访问,RwLock允许多个读取器或一个写入器。Condvar另一方面,允许线程等待特定条件得到满足。

特征互斥锁其他同步原语
所有权独家所有权各不相同(例如,与 RwLock 共享)
阻塞行为锁定时阻止其他线程取决于原始
预防死锁通过精心设计可以实现取决于原始
用例保护共享数据各不相同(例如,RwLock 的读写场景)
表现如有争议可能会影响性能性能因原始数据和用例而异

通道:促进线程通信

在并发编程领域,线程经常需要相互通信。Rust中的通道为线程提供了发送和接收消息的渠道,确保通信顺畅且同步。

渠道背后的概念

通道是线程间发送数据的一种方式。可以将它们视为双向无线电:一个线程发送消息,另一个线程接收消息。此机制可确保数据安全传输,不会出现任何竞争条件或意外行为。

创建和使用频道

Rust 的标准库提供了channel创建发送端和接收端的函数:

Rust
use std::sync::mpsc;
use std::thread;

let (tx, rx) = mpsc::channel();

thread::spawn(move || {
    let val = String::from("Hello");
    tx.send(val).unwrap();
});

let received = rx.recv().unwrap();
println!("Received: {}", received);

☝️在这个例子中,新线程通过通道发送一个字符串消息,主线程接收并打印。

多个生产者,单个消费者

Rust 通道支持多个发送端,但仅支持一个接收端。这种模式通常被称为MPSC(多个生产者,单个消费者):

Rust
use std::sync::mpsc;
use std::thread;

let (tx, rx) = mpsc::channel();

for _ in 0..5 {
    let tx = tx.clone();
    thread::spawn(move || {
        let val = String::from("Hello from a thread!");
        tx.send(val).unwrap();
    });
}

for _ in 0..5 {
    let received = rx.recv().unwrap();
    println!("Received: {}", received);
}

这里,五个线程发送消息,但只有主线程接收消息。

使用 'try_recv' 进行非阻塞接收

虽然recv阻塞线程直到有消息可用,try_recv但不会阻塞。相反,它会立即返回Result

Rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

let (tx, rx) = mpsc::channel();

thread::spawn(move || {
    let val = String::from("Hello");
    tx.send(val).unwrap();
    thread::sleep(Duration::from_secs(1));
});

match rx.try_recv() {
    Ok(msg) => println!("Received: {}", msg),
    Err(e) => println!("Error: {:?}", e),
}

案例研究:Rust 并发问题 一位名叫 Hydradon 的开发人员在 Rust 中遇到了并发问题。为了理解这个问题,他创建了一个简化版本的代码。该代码由一个 Bag 结构和一个 Item 结构组成。目标是生成 10 个线程,这些线程将在 itemList 中的每个项目上执行 Bag 中的 itemAction 方法,如果两个项目属性都在 bag 的属性中,则打印一条语句。 **问题:**执行时,程序只打印了一行,然后挂起,没有返回任何进一步的结果。预期输出是 10 行,因为 itemList 包含 10 个项目。开发人员不确定他的线程逻辑是否正确。 **解决方案:**一位名叫 Loganfsmyth 的用户确定了根本原因。问题在于 Rust 中子表达式的求值顺序,目前尚未定义。开发人员的代码本质上是试图两次锁定同一个互斥锁,导致死锁。Loganfsmyth 提出了一个修复方法:只锁定互斥锁一次,然后检查属性。这将防止死锁。他还提到,如果使用 RwLock 或 ReentrantMutex 而不是 Mutex,代码将可以工作,因为它们允许同一线程对数据有多个不可变引用。 **结果:**使用 Loganfsmyth 的解决方案,程序能够返回,但仍然只打印一行。经过进一步调查,发现所有 10 个线程都运行了,但它们没有从 itemAction 函数打印任何内容。包中的属性被初始化为全零,输出只打印其中一个参数为 0 的行。因此,contains 只对这些行返回 true。

关闭通道

一旦通道的发送端被删除,通道就会关闭。接收端可以使用该recv方法检测此情况,如果通道已关闭且为空,则该方法将返回错误。

缺点和注意事项

虽然通道功能强大,但它们可能并不适合所有场景。例如,当您需要双向通信或性能是关键因素时,其他同步原语可能更合适。

连接句柄:处理线程结果

在 Rust 中,当你使用thread::spawn函数创建一个新线程时,它会返回一个JoinHandle。这是新创建线程的句柄,允许你等待线程完成并检索其结果。

为什么要使用连接手柄?

假设您已经生成了多个线程,每个线程都在执行一项任务。您如何知道它们何时完成?或者,如果它们产生了您需要的结果怎么办?这就是连接句柄发挥作用的地方。它们提供了一种等待线程完成并检索线程计算结果的方法。

Rust
let handle = std::thread::spawn(|| {
    "Hello from the child thread!"
});

let result = handle.join().unwrap();
println!("{}", result);

🚩在上面的代码中,join() 方法在连接句柄上被调用,该方法会阻塞主线程,直到生成的线程完成为止。然后打印出计算结果(在本例中是一个简单的字符串)。

处理线程结果

当线程完成执行时,它可能会返回结果。JoinHandle提供一种方法,join()该方法等待线程完成并返回其结果。

Rust
let handle = std::thread::spawn(|| {
    42
});

let result = handle.join().unwrap();
assert_eq!(result, 42);

☝️在这个例子中,生成的线程只是返回数字 42。主线程等待生成的线程使用 join() 方法完成,然后断言结果确实是 42。

前端知识体系 · wcrane