Rust语言学习20 深入并发 进程 线程 协程 异步
  dBFTbkVLMBge 2023年11月02日 34 0

并发,是指在宏观意义上同一时间处理多个任务。并发的方式一般包含为三种:多进程、多线程以及最近几年刚刚火起来的协程。

一、多进程并发

创建两个项目  子进程subProgress和主进程mainProgress

子进程代码如下

use std::thread::sleep;
use std::time::Duration;

fn main() {
    println!("Hello, world!");
    sleep(Duration::from_secs(5));
    println!("Bye,world!");
}

主进程代码如下

use std::process::Command;

fn main() {
    Command::new("../subProgress/target/debug/subProgress.exe").spawn().unwrap();
}

主进程运行结果为 

Rust语言学习20 深入并发  进程 线程 协程 异步_多进程

因为主进程启动了子进程后立刻退出了。我们需要等待子进程结束。

等待子进程结束

要等待子进程结束,需要使用一个变量保存子进程对象,然后调用子进程的

wait方法:

修改主进程代码如下

use std::process::Command;

fn main() {
    // wait方法会改变子进程对象的状态 所以子进程对象必须是可变的
    let mut p = Command::new("../subProgress/target/debug/subProgress.exe").spawn().unwrap();
    p.wait().unwrap();
}

修改代码后运行结果为

Rust语言学习20 深入并发  进程 线程 协程 异步_多进程_02

Rust语言学习20 深入并发  进程 线程 协程 异步_并发编程_03

子进程使用命令行参数

use std::env::args;
use std::thread::sleep;
use std::time::Duration;

fn main() {
    println!("Hello, world!");
    let mut args = args();
    let seconds = args.nth(1).unwrap().parse::<u64>().unwrap();
    sleep(Duration::from_secs(seconds));
    println!("Bye,world!");
}

配置参数5

Rust语言学习20 深入并发  进程 线程 协程 异步_多进程_04

主进程代码修改如下,参数通过arg传给子进程即可

use std::process::Command;

fn main() {
    // wait方法会改变子进程对象的状态 所以子进程对象必须是可变的
    let mut p = Command::new("../subProgress/target/debug/subProgress.exe").arg("10").spawn().unwrap();
    p.wait().unwrap();
}

进程间通信

Rust当然也可以通过管道进行进程间通信。修改子进程代码,将函数的参数设置为接收数据的条数,每接收到一条数据就原样返回,直到达到设定的条数后退出

子进程代码

use std::env::args;
use std::io;
use std::thread::sleep;
use std::time::Duration;

fn main() {
    println!("Hello, world!");
    let mut args = args();
    // 环境配置输入数据条数count
    let count = args.nth(1).unwrap().parse::<u64>().unwrap();
    let mut index = 0;
    // 打开标准输入
    let stdin = io::stdin();
    while index < count {
        let mut s = String::new();
        // 把输入的字符串读入到s
        stdin.read_line(&mut s).unwrap();
        // 把输入的数据打印
        println!("{}",s.trim());
        index+=1
    }
    println!("Bye,world!");
}

然后在主进程 循环向子进程的标准输入里写入消息,并读取标准输出的内容

use std::io::{BufRead, BufReader, Read, Write};
use std::process::{Command, Stdio};

fn main() {
   /* // wait方法会改变子进程对象的状态 所以子进程对象必须是可变的
    let mut p = Command::new("../subProgress/target/debug/subProgress.exe").arg("10").spawn().unwrap();
    p.wait().unwrap();*/

    // 定义消息数组
    let msglist = ["msg1","msg2","msg3","msg4","msg5"];
    // 主进程调用子进程
    let mut p = Command::new("../subProgress/target/debug/subProgress.exe")
        .arg(msglist.len().to_string())//消息个数
        .stdin(Stdio::piped())// 子进程标准输入重定向到管道
        .stdout(Stdio::piped())// 子进程标准输出重定向到管道
        .spawn()
        .unwrap();

    // 子进程的输入p_stdin
    let p_stdin = p.stdin.as_mut().unwrap();
    // 子进程输出p_stdout
    let mut p_stdout = BufReader::new(p.stdout.as_mut().unwrap());
    let mut line = String::new();
    // 接收子进程输出hello world
    p_stdout.read_line(&mut line).unwrap();
    println!("{}",line);

    // 循环发送消息给子进程
    for msg in msglist.iter() {
        // 发送消息
        println!("write to sub:{}",msg);
        // 给子进程输入msg消息内容
        p_stdin.write(msg.as_bytes()).unwrap();
        // 终止符
        p_stdin.write("\n".as_bytes()).unwrap();

        // 接收消息
        line.clear();
        p_stdout.read_line(&mut line).unwrap();
        println!("read from sub:{}",line);
    }

    // 接收子进程输出Bye world
    line.clear();
    p_stdout.read_line(&mut line).unwrap();
    println!("{}",line);

    // 等待子进程结束
    p.wait().unwrap();

}

Rust语言学习20 深入并发  进程 线程 协程 异步_多进程_05

二、多线程并发

线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

创建子线程

use std::thread;
use std::time::Duration;

// 线程函数
fn thread_fn(count:i32) {
    for i in 1..count {
        println!("hi number {} from thread",i);
        thread::sleep(Duration::from_secs(1));
    }
}

fn main() {
    let count = 5;
    // 创建线程
    let t = thread::spawn(move || {thread_fn(count)});
    // 等待线程结束
    t.join().unwrap();
}

Rust语言学习20 深入并发  进程 线程 协程 异步_多进程_06

线程间通信

使用通道传递数据

通道( channel)是个很时髦的词汇, Go和Rust都用通道来解决线程间通信的问题。与进程的管道类似,都是分为发送端和接收端,不同的是,通道的发送端可以不只一个,但接收端只能是一个。这在Rust中被称为mpsc(multiple producer, single consumer )。

use std::thread;
use std::sync::mpsc;

fn main() {
    // 创建通道channel 返回(sender,receiver) 这个元组
    let (tx,rx) = mpsc::channel();
    // 启动子线程接收消息
    let sub_thread = thread::spawn(move || {
        // 阻塞地接收
        let msg = rx.recv().unwrap();
        println!("sub thread receive:{}",msg);
    });
    // 主线程向通道发送消息
    tx.send(String::from("hello")).unwrap();
    // 等待子线程结束
    sub_thread.join().unwrap();
}

运行结果为

Rust语言学习20 深入并发  进程 线程 协程 异步_并发编程_07

通道也可以非阻塞接收消息

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
// 非阻塞接收try_recv
fn main() {
    // 创建通道channel 返回(sender,receiver) 这个元组
    let (tx,rx) = mpsc::channel();
    // 启动子线程接收消息
    let sub_thread = thread::spawn(move || {
        // 非阻塞接收 使用loop循环
        loop {
            match rx.try_recv() {
                Ok(msg) => println!("sub thread receive:{}",msg),
                _=> {
                    println!("sub thread receive nothing");
                    // 接收不到就休息1秒
                    thread::sleep(Duration::from_secs(1))
                }
            }
        }
    });
    thread::sleep(Duration::from_secs(3));
    // 主线程向通道发送消息
    tx.send(String::from("hello")).unwrap();
    // 等待子线程结束
    sub_thread.join().unwrap();
}

运行结果为

Rust语言学习20 深入并发  进程 线程 协程 异步_多进程_08


多个发送者一个接收者

use std::thread;
use std::sync::mpsc;

fn main() {
    // 创建通道
    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    let tx2 = tx.clone();

    // 启动子线程
    let t1 = thread::spawn(move || {
        tx1.send(String::from("hello. I'm thread1.")).unwrap();
    });

    let t2 = thread::spawn(move || {
        tx2.clone().send(String::from("hello. I'm thread2.")).unwrap();
    });

    for msg in rx {
        println!("main thread recv: {}", msg);
    }
}

运行结果为

Rust语言学习20 深入并发  进程 线程 协程 异步_并发编程_09

共享内存

在没有通道之前,大多数语言都是通过共享内存的方式实现线程间的数据共享。共享内存固然好,但使用过程中有着诸多的问题,多个线程需要互斥的访问内存数据,需要与信号量结合进行加锁。加锁后又会出现新的问题,比如列锁等。Go语言直接明确的表示“不要通过共享内存来通讯 !”。

Rust依然保留了共享内存的实现方法,在允许使用共享内存的同时,将共享内存与信号量紧密的结合在了一起。如果你不使用信号量加锁,就得不到共享内存的所有权,共享内存使用完后会自动解锁等一系统的措施来保证共享内存的安全。

use std::thread;
use std::sync::{Mutex, Arc};

fn main() {
    // Arc,它的内部为之前的信号量
    let m = Arc::new(Mutex::new(vec![]));
    
    // 循环中创建多个线程
    let mut threads = vec![];
    for i in 1..10 {
        // m具有了clone方法
        let m = m.clone();

        // 创建线程
        let t = thread::spawn(move ||{
            // Arc类型可以直接使用内部的值,从信号量中取得共享内存的方法与不使用Arc完全一致
            let mut names = m.lock().unwrap();
            // 修改共享内存
            names.push(format!("thread{}", i));
        });
        
        threads.push(t);
    }

    // 等待所有线程结束
    for t in threads {
        t.join().unwrap();
    }
    
    // 打印接龙名单
    let names = m.lock().unwrap();
    println!("{:?}", names);
}

运行结果为

Rust语言学习20 深入并发  进程 线程 协程 异步_并发编程_10

三、协程并发

并发,是指在宏观意义上同一时间处理多个任务。并发的方式一般包含为三种:多进程、多线程以及最近几年刚刚火起来的协程

协程

协程与进程和线程不是一个级别的概念。进程和线程都是操作系统任务调度的单位,而协程不是,他并不受操作系统的约束,只是通过异步的手段,让单线程的程序有了并发的能力。

协程的并发需要自己的程序实现调度,而Rust的标准库中并没有提供调度协程的模块,需要通过第三方库futures来进行,futures库的资料能查到的很少,大部分都是在讲怎么创建协程,而执行协程都是阻塞的

首先配置cargo.toml  引入futures依赖

[package]
name = "concurrent"
version = "0.1.0"
edition = "2021"

[dependencies]
futures = "0.3.28

协程代码如下

use std::thread::sleep;
use std::time::Duration;
use futures;

// 定义协程函数
async fn coroutine() {
    let mut n = 0;
    while n < 10 {
        n+=1;
        println!("coroutine:{}",n);
        sleep(Duration::from_secs(1));
    }
}

fn main() {
    let coroutine = coroutine();
    futures::executor::block_on(coroutine);

}

Rust语言学习20 深入并发  进程 线程 协程 异步_并发编程_11

四、异步编程

1 async/await 和 Future

async/await 是 Rust 的异步编程模型,是产生和运行并发任务的手段。

一般而言,async 定义了一个可以并发执行的任务,而 await 则触发这个任务并发执行。Rust 中,async 用来创建 Future,await 来触发 Future 的调度和执行,并等待Future执行完毕。async/await 只是一个语法糖,它使用状态机将 Future 包装起来进行处理。

JavaScript 也是通过 async 的方式提供了异步编程,Rust 的 Future 跟 JavaScript 的 Promise 非常类似。它们的区别:

  • JavaScript 的 Promise 和线程类似,一旦创建就开始执行,对 Promise 的 await 只是等待这个Promise执行完成并得到结果
  • Rust 的 Future,只有在主动 await 后才开始执行

1.1 同步/多线程/异步例子

下面分别用同步的方式、多线程的方式、异步的方式,实现读写文件的需求:读取 Cargo.toml 和 Cargo.lock 并将它们转换成 yaml 写入 /tmp 文件夹下

1.1.1 使用同步实现
use anyhow::Result;
use serde_yaml::Value;
use std::fs;

fn main() -> Result<()> {
    // 读取 Cargo.toml,IO 操作 1
    let content1 = fs::read_to_string("./Cargo.toml")?;
    // 读取 Cargo.lock,IO 操作 2
    let content2 = fs::read_to_string("./Cargo.lock")?;

    // 计算
    let yaml1 = toml2yaml(&content1)?;
    let yaml2 = toml2yaml(&content2)?;

    // 写入 /tmp/Cargo.yml,IO 操作 3
    fs::write("/tmp/Cargo.yml", &yaml1)?;
    // 写入 /tmp/Cargo.lock,IO 操作 4
    fs::write("/tmp/Cargo.lock", &yaml2)?;

    println!("{}", yaml1);
    println!("{}", yaml2);

    Ok(())
}

fn toml2yaml(content: &str) -> Result<String> {
    let value: Value = toml::from_str(&content)?;
    Ok(serde_yaml::to_string(&value)?)
}

缺点:因为是同步读取,在读 Cargo.toml 时,整个主线程被阻塞,直到 Cargo.toml 读完,才能继续读 Cargo.lock 文件,读取两个文件的总共等待时间是 time_for_file1 + time_for_file2。整个主线程,只有在运行 toml2yaml 的时间片内,才真正在执行计算任务,读取文件以及写入文件等这些IO操作,CPU 都在闲置;后面的写入文件也有类似问题

1.1.2 使用多线程实现

此方式把文件读取和写入操作放入单独的线程中执行

use anyhow::{anyhow, Result};
use serde_yaml::Value;

use std::{
    fs,
    thread::{self, JoinHandle},
};

/// 包装一下 JoinHandle,这样可以提供额外的方法
struct MyJoinHandle<T>(JoinHandle<Result<T>>);

impl<T> MyJoinHandle<T> {
    /// 等待 thread 执行完(类似 await)
    pub fn thread_await(self) -> Result<T> {
        self.0.join().map_err(|_| anyhow!("failed"))?
    }
}

fn main() -> Result<()> {
    let t1 = thread_read("./Cargo.toml");
    let t2 = thread_read("./Cargo.lock");

    let content1 = t1.thread_await()?;
    let content2 = t2.thread_await()?;

    // 计算
    let yaml1 = toml2yaml(&content1)?;
    let yaml2 = toml2yaml(&content2)?;

    let t3 = thread_write("/tmp/Cargo.yml", yaml1);
    let t4 = thread_write("/tmp/Cargo.lock", yaml2);

    let yaml1 = t3.thread_await()?;
    let yaml2 = t4.thread_await()?;

    fs::write("/tmp/Cargo.yml", &yaml1)?;
    fs::write("/tmp/Cargo.lock", &yaml2)?;

    println!("{}", yaml1);
    println!("{}", yaml2);

    Ok(())
}

// 针对读文件单独开一个线程
fn thread_read(filename: &'static str) -> MyJoinHandle<String> {
    let handle = thread::spawn(move || {
        let s = fs::read_to_string(filename)?;
        Ok::<_, anyhow::Error>(s)
    });
    MyJoinHandle(handle)
}

// 针对写文件单独开一个线程
fn thread_write(filename: &'static str, content: String) -> MyJoinHandle<String> {
    let handle = thread::spawn(move || {
        fs::write(filename, &content)?;
        Ok::<_, anyhow::Error>(content)
    });
    MyJoinHandle(handle)
}

fn toml2yaml(content: &str) -> Result<String> {
    let value: Value = toml::from_str(&content)?;
    Ok(serde_yaml::to_string(&value)?)
}
  • 优点:读取两个文件是并发执行(写入也类似),大大缩短等待时间,读取的总共等待的时间是 max(time_for_file1, time_for_file2)
  • 缺点:不适用于同时读太多文件的场景;因为每读一个文件会创建一个线程,在操作系统中,线程的数量是有限的,创建过多的线程会大大增加系统的开销

大多数操作系统对 I/O 操作提供了非阻塞接口,Rust 可以利用 async/await 异步处理,进而最大程度的利用 CPU 资源

1.1.3 使用 async/await 异步实现
use anyhow::Result;
use serde_yaml::Value;
use tokio::{fs, try_join};

#[tokio::main]
async fn main() -> Result<()> {
    let f1 = fs::read_to_string("./Cargo.toml");
    let f2 = fs::read_to_string("./Cargo.lock");

    // 等待两个异步io操作完成
    let (content1, content2) = try_join!(f1, f2)?;

    // 计算
    let yaml1 = toml2yaml(&content1)?;
    let yaml2 = toml2yaml(&content2)?;

    let f3 = fs::write("/tmp/Cargo.yml", &yaml1);
    let f4 = fs::write("/tmp/Cargo.lock", &yaml2);

    try_join!(f3, f4)?;

    println!("{}", yaml1);
    println!("{}", yaml2);

    Ok(())
}

fn toml2yaml(content: &str) -> Result<String> {
    let value: Value = toml::from_str(&content)?;
    Ok(serde_yaml::to_string(&value)?)
}

这里使用了tokio::fs,而不是 std::fstokio::fs 的文件操作都会返回一个 Future,然后用 try_ join 轮询这些Future,得到它们运行后的结果。此时文件读取的总时间是 max(time_for_file1, time_for_file2),性能和使用线程的版本几乎一致,但是消耗的线程资源要少很多。

try_join 和 join 宏的作用:是用来轮询多个 Future ,它会依次处理每个 Future,遇到阻塞就处理下一个,直到所有 Future 产生结果(类似JavaScript的Promise.all)。

2 Future 定义

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

Future 有一个关联类型 Output;还有一个 poll() 方法,它返回 Poll<Self::Output>。Poll 是个枚举,有 Ready 和 Pending 两个状态。通过调用 poll() 方法可以推进 Future 的进一步执行,直到被切走为止

在当前 poll 中,若 Future 完成了,则返回 Poll::Ready(result),即得到 Future 的值并返回;若Future 还没完成,则返回 Poll::Pending(),此时 Future 会被挂起,需要等某个事件将其唤醒(wake唤醒函数)

3 executor 调度器

executor 是一个 Future 的调度器。操作系统负责调度线程,但它不会去调度用户态的协程(比如 Future),所以任何使用了协程来处理并发的程序,都需要有一个 executor 来负责协程的调度。

Rust 的 Future 是惰性的:只有在被 poll 轮询时才会运行。其中一个推动它的方式就是在 async 函数中使用 .await 来调用另一个 async 函数,但是这个只能解决 async 内部的问题,那些最外层的 async 函数,需要靠执行器 executor 来推动 。

Rust中的executor

Rust 虽然提供 Future 这样的协程,但它在语言层面并不提供 executor,当不需要使用协程时,不需要引入任何运行时;而需要使用协程时,可以在生态系统中选择最合适的 executor。

Golang也支持协程,但在语言层面自带了一个用户态的调度器

Rust 有如下4中常见的 executor :

  • futures:这个库自带了很简单的 executor
  • tokio:提供 executor,当使用 #[tokio::main] 时,就隐含引入了 tokio 的 executor
  • async-std :提供 executor,和 tokio 类似
  • smol :提供 async-executor,主要提供了 block_on

wake通知机制

executor 会管理一批 Future (最外层的 async 函数),然后通过不停地 poll 推动它们直到完成。 最开始,执行器会先 poll 一次 Future ,后面就不会主动去 poll 了,如果 poll 方法返回 Poll::Pending,就挂起 Future,直到收到某个事件后,通过 wake()函数去唤醒被挂起 Future,Future 就可以去主动通知执行器,它才会继续去 poll,执行器就可以执行该 Future。这种 wake 通知然后 poll 的方式会不断重复,直到 Future 完成。

Waker 提供了 wake() 方法:其作用是可以告诉执行器,相关的任务可以被唤醒了,此时执行器就可以对相应的 Future 再次进行 poll 操作。

vtable具体的实现并不在标准库中,而是在第三方的异步运行时里,比如futures 库的 waker vtable 定义

4 Rust 异步处理流程

Reactor Pattern模式

Reactor Pattern 是构建高性能事件驱动系统的一个很典型模式,executor 和 reactor 是 Reactor Pattern 的组成部分。Reactor pattern 包含三部分:

  • task:待处理的任务。任务可以被打断,并且把控制权交给 executor,等待之后的调度
  • executor:一个调度器。维护等待运行的任务(ready queue),以及被阻塞的任务(wait queue)
  • reactor:维护事件队列。当事件来临时,通知 executor 唤醒某个任务等待运行

executor 会调度执行待处理的任务,当任务无法继续进行却又没有完成时,它会挂起任务,并设置好合适的唤醒条件。之后,如果 reactor 得到了满足条件的事件,它会唤醒之前挂起的任务,然后 executor 就有机会继续执行这个任务。这样一直循环下去,直到任务执行完毕。

Rust中异步处理的流程

Rust 使用 Future 做异步处理就是一个典型的Reactor Pattern模式。

以 tokio 为例:async/await 提供语法层面的支持,Future 是异步任务的数据结构,当 .await 时,executor 就会调度并执行它

tokio 的调度器会运行在多个线程上,运行线程上自己的 ready queue 上的任务(Future),如果没有,就去别的线程的调度器上偷一些过来运行(work-stealing 调度机制)。当某个任务无法再继续取得进展,此时 Future 运行的结果是 Poll::Pending,那么调度器会挂起任务,并设置好合适的唤醒条件(Waker),等待被 reactor 唤醒。而reactor 会利用操作系统提供的异步 I/O(如epoll / kqueue / IOCP),来监听操作系统提供的 IO 事件,当遇到满足条件的事件时,就会调用 Waker.wake() 唤醒被挂起的 Future,这个 Future 会回到 ready queue 等待执行。

Rust语言学习20 深入并发  进程 线程 协程 异步_并发编程_12

use anyhow::Result;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio_util::codec::{Framed, LinesCodec};

#[tokio::main]
async fn main() -> Result<()> {
    let addr = "0.0.0.0:8080";
    let listener = TcpListener::bind(addr).await?;

    println!("listen to: {}", addr);

    loop {
        let (stream, addr) = listener.accept().await?;
        println!("Accepted: {:?}", addr);

        // 创建异步任务
        tokio::spawn(async move {
            // 使用 LinesCodec 把 TCP 数据切成一行行字符串处理
            let framed = Framed::new(stream, LinesCodec::new());

            // split 成 writer 和 reader
            let (mut w, mut r) = framed.split();

            for line in r.next().await {
                // 每读到一行就加个前缀发回
                w.send(format!("I got: {}", line?)).await?;
            }

            Ok::<_, anyhow::Error>(())
        });
    }
}

这是一个简单的 TCP 服务器,服务器每收到一个客户端的请求,就会用 tokio::spawn 创建一个异步任务,放入 executor 中执行。这个异步任务接受客户端发来的按行分隔(分隔符是 “\r\n”)的数据帧,服务器每收到一行,就加个前缀把内容也按行发回给客户端。

假设客户端输入了很大的一行数据,服务器在执行r.next().await 时,如果接收不完一行的数据时,这个 Future 返回 Poll::Pending,此时它被挂起。当后续客户端的数据到达时,reactor 会知道这个 socket 上又有数据了,于是找到 socket 对应的 Future,将其唤醒,继续接收数据。这样反复下去,最终r.next().await 得到 Poll::Ready(Ok(line)),于是它返回 Ok(line),程序继续往下走,进入到 w.send() 的阶段。

5.async的生命周期

Rust语言学习20 深入并发  进程 线程 协程 异步_多进程_13

在一般情况下,在函数调用后就立即 .await 不会存在任何问题,例如foo(&x).await。但是,若 Future 被先存起来或发送到另一个任务或者线程,就可能存在问题。

Rust语言学习20 深入并发  进程 线程 协程 异步_多进程_14

use std::future::Future;

async fn borrow_x(x: &u8) -> u8 { *x }

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

6.async move

async 可以使用 move 关键字来将环境中变量的所有权转移到语句块内,就像闭包那样,好处是不用解决借用生命周期的问题,坏处就是无法跟其它代码实现对变量的共享

/ 多个不同的 `async` 语句块可以访问同一个本地变量,只要它们在该变量的作用域内执行
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{my_string}");
    };

    let future_two = async {
        // ...
        println!("{my_string}");
    };

    // 运行两个 Future 直到完成
    let ((), ()) = futures::join!(future_one, future_two);
}

// 由于`async move`会捕获环境中的变量,因此只有一个`async move`语句块可以访问该变量,
// 但是它也有非常明显的好处: 变量可以转移到返回的 Future 中,不再受借用生命周期的限制
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();

    async move {
        // ...
        println!("{my_string}");
    }
}

7 async 生成的 Future 类型

tokio例子

cargo.toml配置如下

[package]
name = "concurrent"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = "0.3.28"
tokio = { version = "0.3", features = ["full"] }
use tokio::time::Duration;

#[tokio::main] //此处引入tokio 宏/macro
async fn main() {
    //主线程
    tokio::task::spawn_blocking(|| {
        //运行在一个阻塞的线程,可以看作是一个比较耗时的操作
        sleep(Duration::from_millis(2000));
        println!("hi");
    }).await.unwrap();//使用await关键字等待阻塞线程的任务完成
    //要等待阻塞线程完成后,主线程才能执行
    println!("hello");
}

Rust语言学习20 深入并发  进程 线程 协程 异步_多进程_15

代码里的 say_hello1 和 say_hello2 是等价的,say_hello1使用了async,say_hello2自己返回了 Impl Future结构

use std::thread::sleep;
use tokio::time::Duration;
use std::future::Future;

#[tokio::main]
async fn main() {
    let name1 = "zhangsan".to_string();
    let name2 = "lisi".to_string();

    // 用await执行 Feature
    say_hello1(&name1).await;
    say_hello2(&name2).await;
}

async fn say_hello1(name: &str) -> usize {
    println!("Hello {}", name);
    42
}

// async fn 关键字相当于一个返回 impl Future<Output> 的语法糖
fn say_hello2<'fut>(name: &'fut str) -> impl Future<Output = usize> + 'fut {
    async move {
        println!("Hello {}", name);
        42
    }
}

8 异步使用场景的注意事项

8.1 处理计算密集型任务

要避免在异步任务中处理大量计算密集型的任务,因为效率不高,且还容易饿死其它任务,CPU 密集型任务更适合使用线程,而非 Future

饿死其他任务

因为 Future 的调度是协作式多任务,即除非 Future 主动放弃 CPU,不然它就会一直被执行,直到运行结束

如果真的需要在 tokio(或者其它异步运行时)下运行计算密集型的代码,那么最好使用 yield 来主动让出 CPU,将线程交还给调度器,自己则进入就绪队列等待下一轮的调度,比如 tokio::task::yield_now(),这样可以避免某个计算密集型的任务饿死其它任务。

8.2 异步代码中使用 Mutex

在使用 Mutex 等同步原语时,要注意标准库的 MutexGuard 无法跨越 .await,所以,此时要使用对异步友好的 Mutex,如tokio::sync::Mutex


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

dBFTbkVLMBge