70、Channels(通道)

Channels(通道)

到目前为止,我们生成的所有线程都相当短暂。获取输入、执行运算、返回结果、关闭。

对于我们的票据管理系统,我们希望采用不同的方式:客户端-服务器架构。

我们将有一个长期运行的服务器线程,负责管理我们的状态,即存储的票据。

 

然后,我们将有多个客户端线程。
每个客户端都可以向有状态线程发送命令和查询,以改变其状态(如添加新票据)或检索信息(如获取票据状态)。
客户端线程将并发运行。

 

Communication

到目前为止,我们只有非常有限的父子进程交流:

  • 生成的线程从父进程的上下文借用/使用数据
  • 生成的线程在join时将数据返回给父进程

这对于客户端-服务器设计来说是不够的。客户端需要能够在启动服务器线程后从服务器线程发送和接收数据。

我们可以使用channels解决问题。

 

Channels

Rust 标准库的 std::sync::mpsc 模块提供了多生产者单消费者mpsc)通道。

通道有两种类型:有界和无界。我们暂时使用无界版本,稍后学习其优缺点。

 

通道创建过程如下:

use std::sync::mpsc::channel;

let (sender, receiver) = channel();

我们将得到一个发送方和一个接收方。
sender上调用send,将数据推送到通道。
receiver上调用 recv,从通道中提取数据。

Multiple senders

Sender可克隆:我们可以创建多个Sender(例如,每个客户端线程一个),它们都会将数据推送到同一个通道。

Receiver 是不可克隆的:一个给定通道只能有一个 Receiver

这就是 mpsc(多生产者单消费者)的含义!

Message type

SenderRecevier都是类型参数 T 的泛型。这就是可以在我们的通道上传输的信息类型。

它可以是 u64、结构体、枚举等。

Errors

sendrecv都可能失败。如果Receiver被删除,send将返回错误。
如果所有Sender都被删除,且 Channel 为空,则 recv 返回错误。

换句话说,当通道被有效关闭时,发送和接收都会出错。

 

练习

有四个文件,我们主要编辑的文件是lib.rs

src目录下:

use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};

// data.rs 文件

#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
    pub id: TicketId,
    pub title: TicketTitle,
    pub description: TicketDescription,
    pub status: Status,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
    pub title: TicketTitle,
    pub description: TicketDescription,
}

#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
    ToDo,
    InProgress,
    Done,
}
use crate::data::TicketDraft;
use crate::store::TicketStore;
use std::sync::mpsc::{Receiver, Sender};

// lib.rs 文件

pub mod data;
pub mod store;

pub enum Command {
    //这里填写 `TickerDraft` 是因为tests.rs文件里面调用的Insert的参数是 `TicketDraft`
    Insert(TicketDraft),//TODO
}

// Start the system by spawning the server thread.
// It returns a `Sender` instance which can then be used
// by one or more clients to interact with the server.
// 通过生成服务器线程启动系统
// 他返回一个 `Sender` 实例,该实例可以用于和一个或多个客户端与服务器进行交互。
pub fn launch() -> Sender<Command> {
    let (sender, receiver) = std::sync::mpsc::channel();
    std::thread::spawn(move || server(receiver));
    sender
}

// TODO: The server task should **never** stop.
//  Enter a loop: wait for a command to show up in
//  the channel, then execute it, then start waiting
//  for the next command.
// TODO:服务器任务应该**永不**停止。
//  输入循环:等待显示在通道
//  的命令,然后执行并等待下一个命令
pub fn server(receiver: Receiver<Command>) {
    //TODO
    let mut store = TicketStore::new();//创建存储 `TicketDraft` 的store实例
    while let Ok(command) = receiver.recv() {//模式匹配接收到的指令,确保是OK
        match command {
            Command::Insert(draft) => {
                store.add_ticket(draft);
            }
        }
    }
    //TODO
}

use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;

// store.rs 文件

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);

#[derive(Clone)]
pub struct TicketStore {
    tickets: BTreeMap<TicketId, Ticket>,
    counter: u64,
}

impl TicketStore {
    pub fn new() -> Self {
        Self {
            tickets: BTreeMap::new(),
            counter: 0,
        }
    }

    pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
        let id = TicketId(self.counter);
        self.counter += 1;
        let ticket = Ticket {
            id,
            title: ticket.title,
            description: ticket.description,
            status: Status::ToDo,
        };
        self.tickets.insert(id, ticket);
        id
    }
}

tests目录:

// TODO: Set `move_forward` to `true` in `ready` when you think you're done with this exercise.
//  Feel free to call an instructor to verify your solution!
use std::time::Duration;
use task_channels::data::TicketDraft;
use task_channels::{launch, Command};
use ticket_fields::test_helpers::{ticket_description, ticket_title};

// tests.rs 文件

#[test]
fn a_thread_is_spawned() {
    let sender = launch();
    std::thread::sleep(Duration::from_millis(200));

    sender
        .send(Command::Insert(TicketDraft {
            title: ticket_title(),
            description: ticket_description(),
        }))
        // If the thread is no longer running, this will panic
        // because the channel will be closed.
        .expect("Did you actually spawn a thread? The channel is closed!");
}

#[test]
fn ready() {
    // There's very little that we can check automatically in this exercise,
    // since our server doesn't expose any **read** actions.
    // We have no way to know if the inserts are actually happening and if they
    // are happening correctly.
    // 在这个练习中,我们能自动检查的东西很少、
    // 因为我们的服务器没有公开任何**读**操作。
    // 我们没有办法知道插入是否真正发生,以及是否正确插入。
    let move_forward = true; // TODO

    assert!(move_forward);
}

 

我感觉这个练习完全没有达到使用Rust channels的深度,但是即使是这样,我也是不会写代码,还是看的官方答案才会写的,送给自己一句话:菜就多练

阅读剩余
THE END