72、[练习]ACK模式

双向通信

在我们当前的客户端-服务器实现中,通信是单向的:从客户端到服务器。

客户端无法知道服务器是否收到了信息、执行成功还是失败。这并不理想。

为了解决这个问题,我们可以引入双向通信系统。

 

响应通道

我们需要一种方法让服务器向客户端发回响应。有多种方法可以做到这一点,但最简单的方法是在客户端发送给服务器的信息中加入一个 Sender 通道。服务器在处理信息后,可以使用该通道向客户端发送回复。

在基于消息传递原语构建的 Rust 应用程序中,这是一种相当常见的模式。

 

练习

 

为了通过这个测试文件:

use task_ack_pattern::data::{Status, Ticket, TicketDraft};
use task_ack_pattern::store::TicketId;
use task_ack_pattern::{launch, Command};
use ticket_fields::test_helpers::{ticket_description, ticket_title};

#[test]
fn insert_works() {
    let sender = launch();
    let (response_sender, response_receiver) = std::sync::mpsc::channel();

    let draft = TicketDraft {
        title: ticket_title(),
        description: ticket_description(),
    };
    let command = Command::Insert {
        draft: draft.clone(),
        response_sender,
    };

    sender
        .send(command)
        // If the thread is no longer running, this will panic
        // because the channel will be closed.
        .expect("Did you actually spawn a thread? The channel is closed!");

    let ticket_id: TicketId = response_receiver.recv().expect("No response received!");

    let (response_sender, response_receiver) = std::sync::mpsc::channel();
    let command = Command::Get {
        id: ticket_id,
        response_sender,
    };
    sender
        .send(command)
        .expect("Did you actually spawn a thread? The channel is closed!");

    let ticket: Ticket = response_receiver
        .recv()
        .expect("No response received!")
        .unwrap();
    assert_eq!(ticket_id, ticket.id);
    assert_eq!(ticket.status, Status::ToDo);
    assert_eq!(ticket.title, draft.title);
    assert_eq!(ticket.description, draft.description);
}

 

完成lib.rs:

use crate::data::{Ticket, TicketDraft};
use crate::store::{TicketId, TicketStore};
use std::sync::mpsc::{Receiver, Sender};

pub mod data;
pub mod store;

// Refer to the tests to understand the expected schema.
pub enum Command {
    Insert {
        /* TODO */
    },
    Get {
        /* TODO */
    },
}

pub fn launch() -> Sender<Command> {
    let (sender, receiver) = std::sync::mpsc::channel();
    std::thread::spawn(move || server(receiver));
    sender
}

// TODO: handle incoming commands as expected.
pub fn server(receiver: Receiver<Command>) {
    let mut store = TicketStore::new();
    loop {
        match receiver.recv() {
            Ok(Command::Insert {
                /* TODO */nse_sender,
            }) => {
        /* TODO */end(id);
            }
            Ok(Command::Get {
           /* TODO */r,
            }) => {
                l/* TODO */            }
            Err(_) => {
                // There are no more senders, so we can safely break
                // and shut down the server.
                break;
            }
        }
    }
}

 

解答:

use crate::data::{Ticket, TicketDraft};
use crate::store::{TicketId, TicketStore};
use std::sync::mpsc::{Receiver, Sender};

pub mod data;
pub mod store;

// Refer to the tests to understand the expected schema.
pub enum Command {
    Insert {
        /*
        	因为tests.rs文件里面Insert就是这两个参数,所以填TicketDraft类型的draft
        	
        	let draft = TicketDraft {
                title: ticket_title(),
                description: ticket_description(),
            };
            let command = Command::Insert {
                draft: draft.clone(),
                response_sender,
            };
            
            确定response_sender的类型:
            	response_sender来自:let (response_sender, response_receiver) = std::sync::mpsc::channel();
            	只需要查看std::sync::mpsc::channel()函数的返回值即可
            	pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
                    let (tx, rx) = mpmc::channel();
                    (Sender { inner: tx }, Receiver { inner: rx })
                }
                其实使用RustRover也能非常清楚地看出来
        */
        draft:TicketDraft,
        response_sender:Sender<TicketId>,
    },
    Get {
        /*
        	let ticket: Ticket = response_receiver
            .recv()
            .expect("No response received!")
            .unwrap();
            
            test.rs文件里面有这样一段代码,我们调用了expect,又调用unwarp,那么就意味着我们进行了两层“解包”
            所以,用Option把里面的Ticket包裹起来
        */
        id: TicketId,
        response_sender: Sender<Option<Ticket>>,
    },
}

pub fn launch() -> Sender<Command> {
    let (sender, receiver) = std::sync::mpsc::channel();
    std::thread::spawn(move || server(receiver));
    sender
}

// TODO: handle incoming commands as expected.
pub fn server(receiver: Receiver<Command>) {
    let mut store = TicketStore::new();
    loop {
        match receiver.recv() {
            Ok(Command::Insert {
                draft,			//模式匹配,取这个枚举类型里面的参数
                response_sender	//或取到Sender
            }) => {
                /*
                	前面声明了,response_sender的类型是TickedId,也就是send方法的参数就是TickedId,所以
                	我们要传递的是这个draft的参数,现在还是TicketDraft,add_ticket的作用就是把一个TicketDraft
                		传入到store里面。
                	而add_ticket函数的返回值就是TicketId,所以我们可以直接使用这个函数作为send函数的参数:
                		pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
							...
                            id
                        }
                	另外为什么使用unwarp,其实删掉也没事,因为send函数的返回值是Result类型:
                		pub fn send(&self, t: T) -> Result<(), SendError<T>> {
                            self.inner.send(t)
                        }
                    	题目没有明确要求我们处理panic,那我们就可以完全忽视,不过我的答案,下面的同样的send就没有设置unwarp
                	
                */
                response_sender.send(store.add_ticket(draft)).unwrap();
            }
            Ok(Command::Get {
                   id, //这里按照tests.rs文件里面的内容填充这里的参数
                   response_sender,
            }) => {
               	/*
               		这里是相当于ACK的确认信号,用cloned:
               		pub fn cloned(self) -> Option<T>
                    where
                        T: Clone,
                    {
                        match self {
                            Some(t) => Some(t.clone()),
                            None => None,
                        }
                    }
                    因为get函数:pub fn get(&self, id: TicketId) -> Option<&Ticket> {
                                    self.tickets.get(&id)
                                }
                    返回的是Option<&Ticket>一个引用,所以我们要Clone,对Option Clone就用Cloned了
               	*/
                response_sender.send(store.get(id).cloned());            
            }
            Err(_) => {
                // There are no more senders, so we can safely break
                // and shut down the server.
                break;
            }
        }
    }
}

 

阅读剩余
THE END