1. 显式线程
1.1 生成短期线程
需要安装crossbeam
库,可通过cargo add crossbeam
命令安装
[dependencies] crossbeam = "0.8.2"
使用 crossbeam
crate 为并发和并行编程提供了数据结构和函数。Scope::spawn
生成一个新的作用域线程,该线程确保传入 crossbeam::scope
函数的闭包在返回之前终止,这意味着您可以从调用的函数中引用数据。
extern crate crossbeam;fn main () { let arr = &[1 , 25 , -4 , 10 , 36 , 88 , -99 ]; let max = find_max (arr); assert_eq! (max, Some (88 )); } fn find_max (arr: &[i32 ]) -> Option <i32 > { const THRESHOLD: usize = 2 ; if arr.len () <= THRESHOLD { return arr.iter ().cloned ().max (); } let mid = arr.len () / 2 ; let (left, right) = arr.split_at (mid); crossbeam::scope (|s| { let thread_l = s.spawn (|_| find_max (left)); let thread_r = s.spawn (|_| find_max (right)); let max_l = thread_l.join ().unwrap ()?; let max_r = thread_r.join ().unwrap ()?; Some (max_l.max (max_r)) }) .unwrap () }
1.2. 创建并发的数据管道
在章节1.1基础上再次安装crossbeam-channel
库,可通过cargo add crossbeam-channel
命令安装
[dependencies] crossbeam = "0.8.2" crossbeam-channel = "0.5.7"
使用 crossbeam
和 crossbeam-channel
两个 crate 创建了一个并行的管道。管道有一个数据源和一个数据接收器,数据在从源到接收器的过程中由两个工作线程并行处理。
使用容量由 crossbeam_channel::bounded
分配的有界信道。生产者必须在它自己的线程上,因为它产生的消息比工作线程处理它们的速度快(因为工作线程休眠了半秒)——这意味着生产者将在对 [crossbeam_channel::Sender::send]
调用时阻塞半秒,直到其中一个工作线程对信道中的数据处理完毕。也请注意,信道中的数据由最先接收它的任何工作线程调用,因此每个消息都传递给单个工作线程,而不是传递给两个工作线程。
通过迭代器 crossbeam_channel::Receiver::iter
方法从信道读取数据,这将会造成阻塞,要么等待新消息,要么直到信道关闭。因为信道是在 crossbeam::scope
范围内创建的,我们必须通过 drop
手动关闭它们,以防止整个程序阻塞工作线程的 for
循环。可以将对 drop
的调用视作不再发送消息的信号。
extern crate crossbeam;extern crate crossbeam_channel;use crossbeam_channel::bounded;use std::thread;use std::time::Duration;fn main () { let (send1, recv1) = bounded (1 ); let (send2, recv2) = bounded (1 ); let n_msgs = 4 ; let n_workers = 2 ; crossbeam::scope (|s| { s.spawn (|_| { for i in 0 ..n_msgs { send1.send (i).unwrap (); println! ("发送来源 {}" , i); } drop (send1); }); for _ in 0 ..n_workers { let (sendr, recvr) = (send2.clone (), recv1.clone ()); s.spawn (move |_| { thread::sleep (Duration::from_millis (500 )); for msg in recvr.iter () { println! ("工作线程 {:?} 接受到 {}. 并予以百倍奉还" , thread::current ().id (), msg); sendr.send (msg * 100 ).unwrap (); } }); } drop (send2); for msg in recv2.iter () { println! ("返回到接收器 {}" , msg); } }) .unwrap (); }
发送来源 0 工作线程 ThreadId(3) 接受到 0. 并予以百倍奉还 工作线程 ThreadId(3) 接受到 1. 并予以百倍奉还 返回到接收器 0 返回到接收器 100 发送来源 1 发送来源 2 工作线程 ThreadId(4) 接受到 2. 并予以百倍奉还 发送来源 3 返回到接收器 200 工作线程 ThreadId(4) 接受到 3. 并予以百倍奉还 返回到接收器 300
1.3 在两个线程间传递数据
在单生产者、单消费者(SPSC)环境中使用 crossbeam-channel
。我们构建的生成短期线程实例中,使用 crossbeam::scope
和 Scope::spawn
来管理生产者线程。在两个线程之间,使用 crossbeam_channel::unbounded
信道交换数据,这意味着可存储消息的数量没有限制。生产者线程在消息之间休眠半秒。
use std::{thread, time};use crossbeam_channel::unbounded;fn main () { let (snd, rcv) = unbounded (); let n_msgs = 5 ; crossbeam::scope (|s| { s.spawn (|_| { for i in 0 ..n_msgs { snd.send (i).unwrap (); thread::sleep (time::Duration::from_millis (500 )); } }); }).unwrap (); for _ in 0 ..n_msgs { let msg = rcv.recv ().unwrap (); println! ("收到 {}" , msg); } }
1.4 保持全局可变状态
使用 lazy_static
声明全局状态。lazy_static
创建了一个全局可用的 static ref
,它需要 Mutex
来允许变化(请参阅 RwLock
)。在 Mutex
的包裹下,保证了状态不能被多个线程同时访问,从而防止出现争用情况。必须获取 MutexGuard
,方可读取或更改存储在 Mutex
中的值。
此小节需要安装lazy_static
库,可通过cargo add lazy_static
命令安装
[dependencies] lazy_static = "1.4.0" error-chain = "0.12.4"
use error_chain::error_chain;use lazy_static::lazy_static;use std::sync::Mutex;error_chain!{ } lazy_static! { static ref FRUIT: Mutex<Vec <String >> = Mutex::new (Vec ::new ()); } fn insert (fruit: &str ) -> Result <()> { let mut db = FRUIT.lock ().map_err (|_| "Failed to acquire MutexGuard" )?; db.push (fruit.to_string ()); Ok (()) } fn main () -> Result <()> { insert ("苹果" )?; insert ("橘子" )?; insert ("梨" )?; { let db = FRUIT.lock ().map_err (|_| "Failed to acquire MutexGuard" )?; db.iter ().enumerate ().for_each (|(i, item)| println! ("{}: {}" , i, item)); } insert ("葡萄" )?; Ok (()) }
1.5 对所有 iso
文件的 SHA256
值并发求和
计算了当前目录中每个扩展名为 iso
的文件的 SHA256
哈希值。线程池生成的线程数与使用 num_cpus::get
获取的系统内核数相等。Walkdir::new
遍历当前目录,并调用 execute
来执行读取和计算 SHA256
哈希值的操作。
此小节需要安装多个库
threadpool
库,可通过cargo add threadpool
命令安装
num_cpus
库,可通过cargo add num_cpus
命令安装
walkdir
库,可通过cargo add walkdir
命令安装
ring
库,可通过cargo add ring
命令安装
[dependencies] error-chain = "0.12.4" ring = "0.16.20" walkdir = "2.3.3" num_cpus = "1.15.0" threadpool = "1.8.1"
use walkdir::WalkDir;use std::fs::File;use std::io::{BufReader, Read, Error};use std::path::Path;use threadpool::ThreadPool;use std::sync::mpsc::channel;use ring::digest::{Context, Digest, SHA256};fn is_iso (entry: &Path) -> bool { match entry.extension () { Some (e) if e.to_string_lossy ().to_lowercase () == "iso" => true , _ => false , } } fn compute_digest <P: AsRef <Path>>(filepath: P) -> Result <(Digest, P), Error> { let mut buf_reader = BufReader::new (File::open (&filepath)?); let mut context = Context::new (&SHA256); let mut buffer = [0 ; 1024 ]; loop { let count = buf_reader.read (&mut buffer)?; if count == 0 { break ; } context.update (&buffer[..count]); } Ok ((context.finish (), filepath)) } fn main () -> Result <(), Error> { let pool = ThreadPool::new (num_cpus::get ()); let (tx, rx) = channel (); for entry in WalkDir::new ("/home/user/Downloads" ) .follow_links (true ) .into_iter () .filter_map (|e| e.ok ()) .filter (|e| !e.path ().is_dir () && is_iso (e.path ())) { let path = entry.path ().to_owned (); let tx = tx.clone (); pool.execute (move || { let digest = compute_digest (path); tx.send (digest).expect ("无法发送数据!" ); }); } drop (tx); for t in rx.iter () { let (sha, path) = t?; println! ("{:?} {:?}" , sha, path); } Ok (()) }
SHA256:72e880f84a8cbc734160ca3b043c91b455ae5d8877d0d5afe15fa2894e177f07 "/home/user/Downloads/ubuntu.iso" SHA256:2da6c248348f5ff0ee06e57222d6cd7ff2a4c652195db7325ee8327e44175f53 "/home/user/Downloads/debian.iso" SHA256:6ea82342f0db613ce21a0f6df2e59ec2f0c9b05ed4e50f9288305c0d492b42b0 "/home/user/Downloads/win11.iso" SHA256:cb4ca11618c4e5a24f7a6bcff2eb2a14453f4d802772374386f69d710568aef8 "/home/user/Downloads/win12.iso"
1.6 将绘制分形的线程分派到线程池
通过从朱莉娅集绘制分形来生成图像,该集合具有用于分布式计算的线程池。使用 ImageBuffer::new
为指定宽度和高度的输出图像分配内存,Rgb::from_channels
信道则计算输出图像的 RGB
像素值。使用 ThreadPool
创建线程池,线程池中的线程数量和使用 num_cpus::get
获取的系统内核数相等。ThreadPool::execute
将每个像素作为单独的作业接收。mpsc::channel
信道接收作业,Receiver::recv
接收器则检索作业。ImageBuffer::put_pixel
处理数据,设置像素颜色。最后,ImageBuffer::save
将图像存储为 output.png
。
此小节需要在1.5小节基础上安装如下两个库
num
库,可通过cargo add num
命令安装
image
库,可通过cargo add image
命令安装
[dependencies] error-chain = "0.12.4" num_cpus = "1.15.0" threadpool = "1.8.1" num = "0.4.0" image = "0.24.6"
use error_chain::error_chain;use image::{ImageBuffer, Rgb};use num::complex::Complex;use std::sync::mpsc::{channel, RecvError};use threadpool::ThreadPool;error_chain! { foreign_links { MpscRecv (RecvError); Io (std::io::Error); } } fn wavelength_to_rgb (wavelength: u32 ) -> Rgb<u8 > { let wave = wavelength as f32 ; let (r, g, b) = match wavelength { 380 ..=439 => ((440 . - wave) / (440 . - 380 .), 0.0 , 1.0 ), 440 ..=489 => (0.0 , (wave - 440 .) / (490 . - 440 .), 1.0 ), 490 ..=509 => (0.0 , 1.0 , (510 . - wave) / (510 . - 490 .)), 510 ..=579 => ((wave - 510 .) / (580 . - 510 .), 1.0 , 0.0 ), 580 ..=644 => (1.0 , (645 . - wave) / (645 . - 580 .), 0.0 ), 645 ..=780 => (1.0 , 0.0 , 0.0 ), _ => (0.0 , 0.0 , 0.0 ), }; let factor = match wavelength { 380 ..=419 => 0.3 + 0.7 * (wave - 380 .) / (420 . - 380 .), 701 ..=780 => 0.3 + 0.7 * (780 . - wave) / (780 . - 700 .), _ => 1.0 , }; let (r, g, b) = ( normalize (r, factor), normalize (g, factor), normalize (b, factor), ); Rgb ([r, g, b]) } fn julia (c: Complex<f32 >, x: u32 , y: u32 , width: u32 , height: u32 , max_iter: u32 ) -> u32 { let width = width as f32 ; let height = height as f32 ; let mut z = Complex { re: 3.0 * (x as f32 - 0.5 * width) / width, im: 2.0 * (y as f32 - 0.5 * height) / height, }; let mut i = 0 ; for t in 0 ..max_iter { if z.norm () >= 2.0 { break ; } z = z * z + c; i = t; } i } fn normalize (color: f32 , factor: f32 ) -> u8 { ((color * factor).powf (0.8 ) * 255 .) as u8 } fn main () -> Result <()> { let (width, height) = (1920 , 1080 ); let mut img = ImageBuffer::new (width, height); let iterations = 300 ; let c = Complex::new (-0.8 , 0.156 ); let pool = ThreadPool::new (num_cpus::get ()); let (tx, rx) = channel (); for y in 0 ..height { let tx = tx.clone (); pool.execute (move || { for x in 0 ..width { let i = julia (c, x, y, width, height, iterations); let pixel = wavelength_to_rgb (380 + i * 400 / iterations); tx.send ((x, y, pixel)).expect ("无法发送数据!" ); } }); } for _ in 0 ..(width * height) { let (x, y, pixel) = rx.recv ()?; img.put_pixel (x, y, pixel); } let _ = img.save ("output.png" ).unwrap (); Ok (()) }
2. 数据并行
2.1 并行改变数组中元素
实例使用了 rayon
库,这是一个 Rust 程序设计语言的数据并行库。rayon
为任何并行可迭代的数据类型提供 par_iter_mut
方法。这是一个类迭代器的链,可以对链内的数据并行计算。
rayon
库,可通过cargo add rayon
命令安装
[dependencies] rayon = "1.7.0"
use rayon::prelude::*;fn main () { let mut arr = [0 , 7 , 9 , 11 ]; arr.par_iter_mut ().for_each (|p| *p -= 1 ); println! ("{:?}" , arr); }
2.2 并行测试集合中任意或所有的元素是否匹配给定断言
如何使用 rayon::any
和 rayon::all
方法,这两个方法是分别与 std::any
和 std::all
相对应的并行方法。rayon::any
并行检查迭代器的任意元素是否与断言匹配,并在找到一个匹配的元素时就返回。rayon::all
并行检查迭代器的所有元素是否与断言匹配,并在找到不匹配的元素时立即返回。
use rayon::prelude::*;fn main () { let mut vec = vec! [2 , 4 , 6 , 8 ]; assert! (!vec.par_iter ().any (|n| (*n % 2 ) != 0 )); assert! (vec.par_iter ().all (|n| (*n % 2 ) == 0 )); assert! (!vec.par_iter ().any (|n| *n > 8 )); assert! (vec.par_iter ().all (|n| *n <= 8 )); vec.push (9 ); assert! (vec.par_iter ().any (|n| (*n % 2 ) != 0 )); assert! (!vec.par_iter ().all (|n| (*n % 2 ) == 0 )); assert! (vec.par_iter ().any (|n| *n > 8 )); assert! (!vec.par_iter ().all (|n| *n <= 8 )); }
2.3 使用给定断言并行搜索项
使用 rayon::find_any
和 par_iter
并行搜索 vector
集合,以查找满足指定闭包中的断言的元素。如果有多个元素满足 rayon::find_any
闭包参数中定义的断言,rayon
将返回搜索发现的第一个元素,但不一定是 vector
集合的第一个元素。实例中闭包的参数是对引用的引用(&&x
)
use rayon::prelude::*;fn main () { let v = vec! [6 , 2 , 1 , 9 , 3 , 8 , 11 ]; let f1 = v.par_iter ().find_any (|&&x| x == 9 ); let f2 = v.par_iter ().find_any (|&&x| x % 2 == 0 && x > 6 ); let f3 = v.par_iter ().find_any (|&&x| x > 8 ); assert_eq! (f1, Some (&9 )); assert_eq! (f2, Some (&8 )); assert! (f3 > Some (&8 )); }
2.4 对 vector
并行排序
首先分配空字符串 vector
;然后,通过 par_iter_mut().for_each
并行对 vector
填充随机值。尽管存在多种选择,可以对可枚举数据类型进行排序,但 par_sort_unstable
通常比稳定排序(相同的值排序后相对顺序不变)算法快。
在之前基础上安装rand
库,可通过cargo add rand
命令安装
[dependencies] rayon = "1.7.0" rand = "0.8.5"
use rand::distributions::Alphanumeric;use rand::{thread_rng, Rng};use rayon::prelude::*;fn main () { let mut vec = vec! [String ::new (); 20 ]; vec.par_iter_mut ().for_each (|p| { let mut rng = thread_rng (); *p = (0 ..5 ).map (|_| rng.sample (Alphanumeric) as char ).collect () }); vec.par_sort_unstable (); }
2.5 Map-reduce
并行计算
使用 rayon::filter
、rayon::map
,以及 rayon::reduce
计算 Person
对象中年龄超过 30 岁的那些人的平均年龄。rayon::filter
过滤集合中满足给定断言的元素。rayon::map
对每个元素执行一次计算,创建一个新的迭代;rayon::reduce
执行新的计算,基于前一次的 reduce
计算结果和当前元素累加在一起。另外可以查看 rayon::sum
,它与本实例中的 reduce
计算具有相同的结果。
use rayon::prelude::*;struct Person { age: u32 , } fn main () { let v : Vec <Person> = vec! [ Person { age: 23 }, Person { age: 19 }, Person { age: 42 }, Person { age: 17 }, Person { age: 17 }, Person { age: 31 }, Person { age: 30 }, ]; let num_over_30 = v.par_iter ().filter (|&x| x.age > 30 ).count () as f32 ; let sum_over_30 = v .par_iter () .map (|x| x.age) .filter (|&x| x > 30 ) .reduce (|| 0 , |x, y| x + y); let alt_sum_30 : u32 = v.par_iter ().map (|x| x.age).filter (|&x| x > 30 ).sum (); let avg_over_30 = sum_over_30 as f32 / num_over_30; let alt_avg_over_30 = alt_sum_30 as f32 / num_over_30; assert! ((avg_over_30 - alt_avg_over_30).abs () < std::f32 ::EPSILON); println! ("30岁以上的人的平均年龄是 {}" , avg_over_30); }
2.6 并行生成 png
缩略图
为当前目录中的所有 .png
图像文件生成缩略图,然后将生成的缩略图保存在一个名为 thumbnails
的新文件夹中。glob::glob_with
在当前目录中查找 jpeg
图像文件,rayon
通过 par_iter
方法调用 DynamicImage::resize
,并行地调整图像大小。
在之前基础上安装glob
库,可通过cargo add glob
命令安装
[dependencies] error-chain = "0.12.4" image = "0.24.6" rayon = "1.7.0" glob = "0.3.1"
use error_chain::error_chain;use std::fs::create_dir_all;use std::path::Path;use error_chain::ChainedError;use glob::{glob_with, MatchOptions};use image::{imageops::FilterType, ImageError};use rayon::prelude::*;error_chain! { foreign_links { Image (ImageError); Io (std::io::Error); Glob (glob::PatternError); } } fn main () -> Result <()> { let options : MatchOptions = Default ::default (); let files : Vec <_> = glob_with ("*.png" , options)? .filter_map (|x| x.ok ()) .collect (); if files.len () == 0 { error_chain::bail!("在当前目录中找不到 .png 文件" ); } let thumb_dir = "thumbnails" ; create_dir_all (thumb_dir)?; println! ("将 {} 缩略图保存到 '{}'..." , files.len (), thumb_dir); let image_failures : Vec <_> = files .par_iter () .map (|path| { make_thumbnail (path, thumb_dir, 300 ) .map_err (|e| e.chain_err (|| path.display ().to_string ())) }) .filter_map (|x| x.err ()) .collect (); image_failures .iter () .for_each (|x| println! ("{}" , x.display_chain ())); println! ("{} thumbnails 保存成功" , files.len () - image_failures.len ()); Ok (()) } fn make_thumbnail <PA, PB>(original: PA, thumb_dir: PB, longest_edge: u32 ) -> Result <()>where PA: AsRef <Path>, PB: AsRef <Path>, { let img = image::open (original.as_ref ())?; let file_path = thumb_dir.as_ref ().join (original); Ok (img .resize (longest_edge, longest_edge, FilterType::Nearest) .save (file_path)?) }
运行cargo run
(处理1.6小节生成的图片)输出结果如下:
将 1 缩略图保存到 'thumbnails' ... 1 thumbnails 保存成功
├── output.png └── thumbnails └── output.png