70、Channels(通道)
Channels(通道)
到目前为止,我们生成的所有线程都相当短暂。获取输入、执行运算、返回结果、关闭。
对于我们的票据管理系统,我们希望采用不同的方式:客户端-服务器架构。
我们将有一个长期运行的服务器线程,负责管理我们的状态,即存储的票据。
然后,我们将有多个客户端线程。
每个客户端都可以向有状态线程发送命令和查询,以改变其状态(如添加新票据)或检索信息(如获取票据状态)。
客户端线程将并发运行。
Communication
到目前为止,我们只有非常有限的父子进程交流:
- 生成的线程从父进程的上下文借用/使用数据
- 生成的线程在join时将数据返回给父进程
这对于客户端-服务器设计来说是不够的。客户端需要能够在启动服务器线程后从服务器线程发送和接收数据。
我们可以使用channels解决问题。
Channels
Rust 标准库的 std::sync::mpsc 模块提供了多生产者、单消费者(mpsc)通道。
通道有两种类型:有界和无界。我们暂时使用无界版本,稍后学习其优缺点。
通道创建过程如下:
use std::sync::mpsc::channel;
let (sender, receiver) = channel();
我们将得到一个发送方和一个接收方。
在sender上调用send,将数据推送到通道。
在receiver上调用 recv,从通道中提取数据。
Multiple senders
Sender可克隆:我们可以创建多个Sender(例如,每个客户端线程一个),它们都会将数据推送到同一个通道。
而 Receiver 是不可克隆的:一个给定通道只能有一个 Receiver 。
这就是 mpsc(多生产者单消费者)的含义!
Message type
Sender和Recevier都是类型参数 T 的泛型。这就是可以在我们的通道上传输的信息类型。
它可以是 u64、结构体、枚举等。
Errors
send和recv都可能失败。如果Receiver被删除,send将返回错误。
如果所有Sender都被删除,且 Channel 为空,则 recv 返回错误。
换句话说,当通道被有效关闭时,发送和接收都会出错。
练习
有四个文件,我们主要编辑的文件是lib.rs:
src目录下:
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};
// data.rs 文件
#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
pub id: TicketId,
pub title: TicketTitle,
pub description: TicketDescription,
pub status: Status,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
pub title: TicketTitle,
pub description: TicketDescription,
}
#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
ToDo,
InProgress,
Done,
}
use crate::data::TicketDraft;
use crate::store::TicketStore;
use std::sync::mpsc::{Receiver, Sender};
// lib.rs 文件
pub mod data;
pub mod store;
pub enum Command {
//这里填写 `TickerDraft` 是因为tests.rs文件里面调用的Insert的参数是 `TicketDraft`
Insert(TicketDraft),//TODO
}
// Start the system by spawning the server thread.
// It returns a `Sender` instance which can then be used
// by one or more clients to interact with the server.
// 通过生成服务器线程启动系统
// 他返回一个 `Sender` 实例,该实例可以用于和一个或多个客户端与服务器进行交互。
pub fn launch() -> Sender<Command> {
let (sender, receiver) = std::sync::mpsc::channel();
std::thread::spawn(move || server(receiver));
sender
}
// TODO: The server task should **never** stop.
// Enter a loop: wait for a command to show up in
// the channel, then execute it, then start waiting
// for the next command.
// TODO:服务器任务应该**永不**停止。
// 输入循环:等待显示在通道
// 的命令,然后执行并等待下一个命令
pub fn server(receiver: Receiver<Command>) {
//TODO
let mut store = TicketStore::new();//创建存储 `TicketDraft` 的store实例
while let Ok(command) = receiver.recv() {//模式匹配接收到的指令,确保是OK
match command {
Command::Insert(draft) => {
store.add_ticket(draft);
}
}
}
//TODO
}
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;
// store.rs 文件
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);
#[derive(Clone)]
pub struct TicketStore {
tickets: BTreeMap<TicketId, Ticket>,
counter: u64,
}
impl TicketStore {
pub fn new() -> Self {
Self {
tickets: BTreeMap::new(),
counter: 0,
}
}
pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
let id = TicketId(self.counter);
self.counter += 1;
let ticket = Ticket {
id,
title: ticket.title,
description: ticket.description,
status: Status::ToDo,
};
self.tickets.insert(id, ticket);
id
}
}
tests目录:
// TODO: Set `move_forward` to `true` in `ready` when you think you're done with this exercise.
// Feel free to call an instructor to verify your solution!
use std::time::Duration;
use task_channels::data::TicketDraft;
use task_channels::{launch, Command};
use ticket_fields::test_helpers::{ticket_description, ticket_title};
// tests.rs 文件
#[test]
fn a_thread_is_spawned() {
let sender = launch();
std::thread::sleep(Duration::from_millis(200));
sender
.send(Command::Insert(TicketDraft {
title: ticket_title(),
description: ticket_description(),
}))
// If the thread is no longer running, this will panic
// because the channel will be closed.
.expect("Did you actually spawn a thread? The channel is closed!");
}
#[test]
fn ready() {
// There's very little that we can check automatically in this exercise,
// since our server doesn't expose any **read** actions.
// We have no way to know if the inserts are actually happening and if they
// are happening correctly.
// 在这个练习中,我们能自动检查的东西很少、
// 因为我们的服务器没有公开任何**读**操作。
// 我们没有办法知道插入是否真正发生,以及是否正确插入。
let move_forward = true; // TODO
assert!(move_forward);
}
我感觉这个练习完全没有达到使用Rust channels的深度,但是即使是这样,我也是不会写代码,还是看的官方答案才会写的,送给自己一句话:菜就多练
