1. 显式线程

1.1 生成短期线程

  需要安装crossbeam库,可通过cargo add crossbeam 命令安装

[dependencies]
crossbeam = "0.8.2"

  使用 crossbeam crate 为并发和并行编程提供了数据结构和函数。Scope::spawn 生成一个新的作用域线程,该线程确保传入 crossbeam::scope 函数的闭包在返回之前终止,这意味着您可以从调用的函数中引用数据。

  • 本实例将数组一分为二,并在不同的线程中并行计算。
extern crate crossbeam;

fn main() {
let arr = &[1, 25, -4, 10, 36, 88, -99];
let max = find_max(arr);
assert_eq!(max, Some(88));
}

fn find_max(arr: &[i32]) -> Option<i32> {
const THRESHOLD: usize = 2;

if arr.len() <= THRESHOLD {
return arr.iter().cloned().max();
}

let mid = arr.len() / 2;
let (left, right) = arr.split_at(mid);

crossbeam::scope(|s| {
let thread_l = s.spawn(|_| find_max(left));
let thread_r = s.spawn(|_| find_max(right));

let max_l = thread_l.join().unwrap()?;
let max_r = thread_r.join().unwrap()?;

Some(max_l.max(max_r))
})
.unwrap()
}
  • 运行cargo run校验

1.2. 创建并发的数据管道

  在章节1.1基础上再次安装crossbeam-channel库,可通过cargo add crossbeam-channel 命令安装

[dependencies]
crossbeam = "0.8.2"
crossbeam-channel = "0.5.7"

  使用 crossbeamcrossbeam-channel 两个 crate 创建了一个并行的管道。管道有一个数据源和一个数据接收器,数据在从源到接收器的过程中由两个工作线程并行处理。
  使用容量由 crossbeam_channel::bounded 分配的有界信道。生产者必须在它自己的线程上,因为它产生的消息比工作线程处理它们的速度快(因为工作线程休眠了半秒)——这意味着生产者将在对 [crossbeam_channel::Sender::send] 调用时阻塞半秒,直到其中一个工作线程对信道中的数据处理完毕。也请注意,信道中的数据由最先接收它的任何工作线程调用,因此每个消息都传递给单个工作线程,而不是传递给两个工作线程。

  通过迭代器 crossbeam_channel::Receiver::iter方法从信道读取数据,这将会造成阻塞,要么等待新消息,要么直到信道关闭。因为信道是在 crossbeam::scope范围内创建的,我们必须通过 drop 手动关闭它们,以防止整个程序阻塞工作线程的 for 循环。可以将对 drop 的调用视作不再发送消息的信号。

extern crate crossbeam;
extern crate crossbeam_channel;

use crossbeam_channel::bounded;
use std::thread;
use std::time::Duration;

fn main() {
let (send1, recv1) = bounded(1);
let (send2, recv2) = bounded(1);
let n_msgs = 4;
let n_workers = 2;

crossbeam::scope(|s| {
// 生产者线程
s.spawn(|_| {
for i in 0..n_msgs {
send1.send(i).unwrap();
println!("发送来源 {}", i);
}
// 关闭信道 —— 这是退出的必要条件
// for 巡海在工作线程中
drop(send1);
});
// 由 2 个线程并行处理
for _ in 0..n_workers {
// 从数据源发送数据到接收器,接收器接收数据
let (sendr, recvr) = (send2.clone(), recv1.clone());
// 在不同的线程中衍生工人
s.spawn(move |_| {
thread::sleep(Duration::from_millis(500));
// 接收数据,直到信道关闭前
for msg in recvr.iter() {
println!("工作线程 {:?} 接受到 {}. 并予以百倍奉还", thread::current().id(), msg);
sendr.send(msg * 100).unwrap();
}
});
}
// 关闭信道,否则接收器不会关闭
// 退出 for 循坏
drop(send2);

// 接收器
for msg in recv2.iter() {
println!("返回到接收器 {}", msg);
}
})
.unwrap();
}
  • 运行cargo run输出结果如下:
发送来源 0
工作线程 ThreadId(3) 接受到 0. 并予以百倍奉还
工作线程 ThreadId(3) 接受到 1. 并予以百倍奉还
返回到接收器 0
返回到接收器 100
发送来源 1
发送来源 2
工作线程 ThreadId(4) 接受到 2. 并予以百倍奉还
发送来源 3
返回到接收器 200
工作线程 ThreadId(4) 接受到 3. 并予以百倍奉还
返回到接收器 300

1.3 在两个线程间传递数据

  在单生产者、单消费者(SPSC)环境中使用 crossbeam-channel。我们构建的生成短期线程实例中,使用 crossbeam::scopeScope::spawn 来管理生产者线程。在两个线程之间,使用 crossbeam_channel::unbounded 信道交换数据,这意味着可存储消息的数量没有限制。生产者线程在消息之间休眠半秒。

use std::{thread, time};
use crossbeam_channel::unbounded;

fn main() {
let (snd, rcv) = unbounded();
let n_msgs = 5;
crossbeam::scope(|s| {
s.spawn(|_| {
for i in 0..n_msgs {
snd.send(i).unwrap();
thread::sleep(time::Duration::from_millis(500));
}
});
}).unwrap();
for _ in 0..n_msgs {
let msg = rcv.recv().unwrap();
println!("收到 {}", msg);
}
}
  • 运行cargo run输出结果如下:
收到 0
收到 1
收到 2
收到 3
收到 4

1.4 保持全局可变状态

  使用 lazy_static 声明全局状态。lazy_static 创建了一个全局可用的 static ref,它需要 Mutex 来允许变化(请参阅 RwLock)。在 Mutex 的包裹下,保证了状态不能被多个线程同时访问,从而防止出现争用情况。必须获取 MutexGuard,方可读取或更改存储在 Mutex 中的值。

此小节需要安装lazy_static库,可通过cargo add lazy_static 命令安装

[dependencies]
lazy_static = "1.4.0"
error-chain = "0.12.4"
use error_chain::error_chain;
use lazy_static::lazy_static;
use std::sync::Mutex;

error_chain!{ }

lazy_static! {
static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new());
}

fn insert(fruit: &str) -> Result<()> {
let mut db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;
db.push(fruit.to_string());
Ok(())
}

fn main() -> Result<()> {
insert("苹果")?;
insert("橘子")?;
insert("梨")?;
{
let db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;

db.iter().enumerate().for_each(|(i, item)| println!("{}: {}", i, item));
}
insert("葡萄")?;
Ok(())
}
  • 运行cargo run输出结果如下:
0: 苹果
1: 橘子
2: 梨

1.5 对所有 iso 文件的 SHA256 值并发求和

  计算了当前目录中每个扩展名为 iso 的文件的 SHA256 哈希值。线程池生成的线程数与使用 num_cpus::get 获取的系统内核数相等。Walkdir::new 遍历当前目录,并调用 execute 来执行读取和计算 SHA256 哈希值的操作。

此小节需要安装多个库

  • threadpool库,可通过cargo add threadpool 命令安装
  • num_cpus库,可通过cargo add num_cpus 命令安装
  • walkdir库,可通过cargo add walkdir 命令安装
  • ring库,可通过cargo add ring 命令安装
[dependencies]
error-chain = "0.12.4"
ring = "0.16.20"
walkdir = "2.3.3"
num_cpus = "1.15.0"
threadpool = "1.8.1"
use walkdir::WalkDir;
use std::fs::File;
use std::io::{BufReader, Read, Error};
use std::path::Path;
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
use ring::digest::{Context, Digest, SHA256};

// Verify the iso extension
fn is_iso(entry: &Path) -> bool {
match entry.extension() {
Some(e) if e.to_string_lossy().to_lowercase() == "iso" => true,
_ => false,
}
}

fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<(Digest, P), Error> {
let mut buf_reader = BufReader::new(File::open(&filepath)?);
let mut context = Context::new(&SHA256);
let mut buffer = [0; 1024];

loop {
let count = buf_reader.read(&mut buffer)?;
if count == 0 {
break;
}
context.update(&buffer[..count]);
}

Ok((context.finish(), filepath))
}

fn main() -> Result<(), Error> {
let pool = ThreadPool::new(num_cpus::get());

let (tx, rx) = channel();

for entry in WalkDir::new("/home/user/Downloads")
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| !e.path().is_dir() && is_iso(e.path())) {
let path = entry.path().to_owned();
let tx = tx.clone();
pool.execute(move || {
let digest = compute_digest(path);
tx.send(digest).expect("无法发送数据!");
});
}

drop(tx);
for t in rx.iter() {
let (sha, path) = t?;
println!("{:?} {:?}", sha, path);
}
Ok(())
}
  • 运行cargo run输出结果类似于如下结果:
SHA256:72e880f84a8cbc734160ca3b043c91b455ae5d8877d0d5afe15fa2894e177f07 "/home/user/Downloads/ubuntu.iso"
SHA256:2da6c248348f5ff0ee06e57222d6cd7ff2a4c652195db7325ee8327e44175f53 "/home/user/Downloads/debian.iso"
SHA256:6ea82342f0db613ce21a0f6df2e59ec2f0c9b05ed4e50f9288305c0d492b42b0 "/home/user/Downloads/win11.iso"
SHA256:cb4ca11618c4e5a24f7a6bcff2eb2a14453f4d802772374386f69d710568aef8 "/home/user/Downloads/win12.iso"

1.6 将绘制分形的线程分派到线程池

  通过从朱莉娅集绘制分形来生成图像,该集合具有用于分布式计算的线程池。使用 ImageBuffer::new 为指定宽度和高度的输出图像分配内存,Rgb::from_channels 信道则计算输出图像的 RGB 像素值。使用 ThreadPool 创建线程池,线程池中的线程数量和使用 num_cpus::get 获取的系统内核数相等。ThreadPool::execute 将每个像素作为单独的作业接收。mpsc::channel 信道接收作业,Receiver::recv接收器则检索作业。ImageBuffer::put_pixel处理数据,设置像素颜色。最后,ImageBuffer::save 将图像存储为 output.png

此小节需要在1.5小节基础上安装如下两个库

  • num库,可通过cargo add num 命令安装
  • image库,可通过cargo add image 命令安装
[dependencies]
error-chain = "0.12.4"
num_cpus = "1.15.0"
threadpool = "1.8.1"
num = "0.4.0"
image = "0.24.6"
use error_chain::error_chain;
use image::{ImageBuffer, Rgb};
use num::complex::Complex;
use std::sync::mpsc::{channel, RecvError};
use threadpool::ThreadPool;

error_chain! {
foreign_links {
MpscRecv(RecvError);
Io(std::io::Error);
}
}

// 将强度值转换为 RGB 值的函数
// 基于 http://www.efg2.com/Lab/ScienceAndEngineering/Spectra.htm
fn wavelength_to_rgb(wavelength: u32) -> Rgb<u8> {
let wave = wavelength as f32;

let (r, g, b) = match wavelength {
380..=439 => ((440. - wave) / (440. - 380.), 0.0, 1.0),
440..=489 => (0.0, (wave - 440.) / (490. - 440.), 1.0),
490..=509 => (0.0, 1.0, (510. - wave) / (510. - 490.)),
510..=579 => ((wave - 510.) / (580. - 510.), 1.0, 0.0),
580..=644 => (1.0, (645. - wave) / (645. - 580.), 0.0),
645..=780 => (1.0, 0.0, 0.0),
_ => (0.0, 0.0, 0.0),
};

let factor = match wavelength {
380..=419 => 0.3 + 0.7 * (wave - 380.) / (420. - 380.),
701..=780 => 0.3 + 0.7 * (780. - wave) / (780. - 700.),
_ => 1.0,
};

let (r, g, b) = (
normalize(r, factor),
normalize(g, factor),
normalize(b, factor),
);
// Rgb::from_channels(r, g, b, 0) // 自 0.24.0 后已弃用:使用像素的构造函数
Rgb([r, g, b]) //与原书不同
}

// 将茱莉亚集距离映射为强度值
fn julia(c: Complex<f32>, x: u32, y: u32, width: u32, height: u32, max_iter: u32) -> u32 {
let width = width as f32;
let height = height as f32;

let mut z = Complex {
// scale and translate the point to image coordinates
re: 3.0 * (x as f32 - 0.5 * width) / width,
im: 2.0 * (y as f32 - 0.5 * height) / height,
};

let mut i = 0;
for t in 0..max_iter {
if z.norm() >= 2.0 {
break;
}
z = z * z + c;
i = t;
}
i
}

// 规格 RGB 颜色值范围内的强度值
fn normalize(color: f32, factor: f32) -> u8 {
((color * factor).powf(0.8) * 255.) as u8
}

fn main() -> Result<()> {
let (width, height) = (1920, 1080);
let mut img = ImageBuffer::new(width, height);
let iterations = 300;

let c = Complex::new(-0.8, 0.156);

let pool = ThreadPool::new(num_cpus::get());
let (tx, rx) = channel();

for y in 0..height {
let tx = tx.clone();
pool.execute(move || {
for x in 0..width {
let i = julia(c, x, y, width, height, iterations);
let pixel = wavelength_to_rgb(380 + i * 400 / iterations);
tx.send((x, y, pixel)).expect("无法发送数据!");
}
});
}

for _ in 0..(width * height) {
let (x, y, pixel) = rx.recv()?;
img.put_pixel(x, y, pixel);
}
let _ = img.save("output.png").unwrap();
Ok(())
}
  • 运行cargo run将获得一张图片

pic-1680865445147.webp

2. 数据并行

2.1 并行改变数组中元素

  实例使用了 rayon 库,这是一个 Rust 程序设计语言的数据并行库。rayon 为任何并行可迭代的数据类型提供 par_iter_mut 方法。这是一个类迭代器的链,可以对链内的数据并行计算。

  • rayon库,可通过cargo add rayon 命令安装
[dependencies]
rayon = "1.7.0"
use rayon::prelude::*;

fn main() {
let mut arr = [0, 7, 9, 11];
arr.par_iter_mut().for_each(|p| *p -= 1);
println!("{:?}", arr);
}
  • 运行cargo run输出结果如下:
[-1, 6, 8, 10]

2.2 并行测试集合中任意或所有的元素是否匹配给定断言

  如何使用 rayon::anyrayon::all 方法,这两个方法是分别与 std::anystd::all 相对应的并行方法。rayon::any 并行检查迭代器的任意元素是否与断言匹配,并在找到一个匹配的元素时就返回。rayon::all 并行检查迭代器的所有元素是否与断言匹配,并在找到不匹配的元素时立即返回。

use rayon::prelude::*;

fn main() {
let mut vec = vec![2, 4, 6, 8];

assert!(!vec.par_iter().any(|n| (*n % 2) != 0)); // 是否存在不被2整除的数(取反)
assert!(vec.par_iter().all(|n| (*n % 2) == 0)); // 是否所有数均被2整除
assert!(!vec.par_iter().any(|n| *n > 8)); // 是否存在大于8的数(取反)
assert!(vec.par_iter().all(|n| *n <= 8)); // 是否所有数均小于或等于8

vec.push(9);

assert!(vec.par_iter().any(|n| (*n % 2) != 0)); // 是否存在不被2整除的数
assert!(!vec.par_iter().all(|n| (*n % 2) == 0)); // 是否所有数均被2整除(取反)
assert!(vec.par_iter().any(|n| *n > 8)); // 是否存在大于8的数
assert!(!vec.par_iter().all(|n| *n <= 8)); // 是否所有数均小于或等于8(取反)
}
  • 运行cargo run进行校验

2.3 使用给定断言并行搜索项

  使用 rayon::find_anypar_iter 并行搜索 vector 集合,以查找满足指定闭包中的断言的元素。如果有多个元素满足 rayon::find_any 闭包参数中定义的断言,rayon 将返回搜索发现的第一个元素,但不一定是 vector 集合的第一个元素。实例中闭包的参数是对引用的引用(&&x

use rayon::prelude::*;

fn main() {
let v = vec![6, 2, 1, 9, 3, 8, 11];

let f1 = v.par_iter().find_any(|&&x| x == 9);
let f2 = v.par_iter().find_any(|&&x| x % 2 == 0 && x > 6);
let f3 = v.par_iter().find_any(|&&x| x > 8);

assert_eq!(f1, Some(&9));
assert_eq!(f2, Some(&8));
assert!(f3 > Some(&8));
}
  • 运行cargo run进行校验

2.4 对 vector 并行排序

  首先分配空字符串 vector;然后,通过 par_iter_mut().for_each并行对 vector 填充随机值。尽管存在多种选择,可以对可枚举数据类型进行排序,但 par_sort_unstable 通常比稳定排序(相同的值排序后相对顺序不变)算法快。

  • 在之前基础上安装rand库,可通过cargo add rand 命令安装
[dependencies]
rayon = "1.7.0"
rand = "0.8.5"
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use rayon::prelude::*;

fn main() {
let mut vec = vec![String::new(); 20];
vec.par_iter_mut().for_each(|p| {
let mut rng = thread_rng();
*p = (0..5).map(|_| rng.sample(Alphanumeric) as char).collect() //与原书不同
});
vec.par_sort_unstable();
}
  • 运行cargo run进行校验

2.5 Map-reduce 并行计算

  使用 rayon::filterrayon::map,以及 rayon::reduce 计算 Person 对象中年龄超过 30 岁的那些人的平均年龄。rayon::filter 过滤集合中满足给定断言的元素。rayon::map 对每个元素执行一次计算,创建一个新的迭代;rayon::reduce 执行新的计算,基于前一次的 reduce 计算结果和当前元素累加在一起。另外可以查看 rayon::sum,它与本实例中的 reduce 计算具有相同的结果。

use rayon::prelude::*;

struct Person {
age: u32,
}

fn main() {
let v: Vec<Person> = vec![
Person { age: 23 },
Person { age: 19 },
Person { age: 42 },
Person { age: 17 },
Person { age: 17 },
Person { age: 31 },
Person { age: 30 },
];

let num_over_30 = v.par_iter().filter(|&x| x.age > 30).count() as f32;
let sum_over_30 = v
.par_iter()
.map(|x| x.age)
.filter(|&x| x > 30)
.reduce(|| 0, |x, y| x + y);

let alt_sum_30: u32 = v.par_iter().map(|x| x.age).filter(|&x| x > 30).sum();

let avg_over_30 = sum_over_30 as f32 / num_over_30;
let alt_avg_over_30 = alt_sum_30 as f32 / num_over_30;

assert!((avg_over_30 - alt_avg_over_30).abs() < std::f32::EPSILON); //EPSILON: 0.00000011920929
println!("30岁以上的人的平均年龄是 {}", avg_over_30);
}
  • 运行cargo run输出结果如下:
30岁以上的人的平均年龄是 36.5

2.6 并行生成 png 缩略图

  为当前目录中的所有 .png 图像文件生成缩略图,然后将生成的缩略图保存在一个名为 thumbnails 的新文件夹中。glob::glob_with 在当前目录中查找 jpeg 图像文件,rayon 通过 par_iter 方法调用 DynamicImage::resize,并行地调整图像大小。

  • 在之前基础上安装glob库,可通过cargo add glob 命令安装
[dependencies]
error-chain = "0.12.4"
image = "0.24.6"
rayon = "1.7.0"
glob = "0.3.1"
use error_chain::error_chain;

use std::fs::create_dir_all;
use std::path::Path;

use error_chain::ChainedError;
use glob::{glob_with, MatchOptions};
use image::{imageops::FilterType, ImageError};
use rayon::prelude::*;

error_chain! {
foreign_links {
Image(ImageError);
Io(std::io::Error);
Glob(glob::PatternError);
}
}

fn main() -> Result<()> {
let options: MatchOptions = Default::default();
let files: Vec<_> = glob_with("*.png", options)?
.filter_map(|x| x.ok())
.collect();

if files.len() == 0 {
error_chain::bail!("在当前目录中找不到 .png 文件");
}

let thumb_dir = "thumbnails";
create_dir_all(thumb_dir)?;

println!("将 {} 缩略图保存到 '{}'...", files.len(), thumb_dir);

let image_failures: Vec<_> = files
.par_iter()
.map(|path| {
make_thumbnail(path, thumb_dir, 300)
.map_err(|e| e.chain_err(|| path.display().to_string()))
})
.filter_map(|x| x.err())
.collect();

image_failures
.iter()
.for_each(|x| println!("{}", x.display_chain()));

println!("{} thumbnails 保存成功", files.len() - image_failures.len());
Ok(())
}

fn make_thumbnail<PA, PB>(original: PA, thumb_dir: PB, longest_edge: u32) -> Result<()>
where
PA: AsRef<Path>,
PB: AsRef<Path>,
{
let img = image::open(original.as_ref())?;
let file_path = thumb_dir.as_ref().join(original);

Ok(img
.resize(longest_edge, longest_edge, FilterType::Nearest)
.save(file_path)?)
}
  • 运行cargo run(处理1.6小节生成的图片)输出结果如下:
将 1 缩略图保存到 'thumbnails'...
1 thumbnails 保存成功
├── output.png # 原始文件
└── thumbnails
└── output.png # 生成后的文件