Blog
网站首页
将std::net::TcpStream包装成异步接口
将std::net::TcpStream包装成异步接口
2023-02-27 15:42
2023-02-27 15:51
作者:
xmh0511
提交
````rust use std::future::Future; use std::io::{self, BufReader, BufWriter}; use std::io::{Read, Write}; use std::net::TcpStream; use std::sync::{Arc, Mutex}; use std::thread; pub struct AsyncThread
{ f: Option
io::Result
+ Send + 'static>>, ret: Arc
>>>, } impl
AsyncThread
{ pub fn new
(f: F) -> Self where F: FnOnce() -> io::Result
, F: Send + 'static, { Self { f: Some(Box::new(f)), ret: Arc::new(Mutex::new(None)), } } } impl
Future for AsyncThread
{ type Output = io::Result
; fn poll( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll
{ let ret = self.ret.lock().unwrap().take(); match ret { Some(t) => std::task::Poll::Ready(t), None => { let that = self.get_mut(); let f = that.f.take(); let setter = that.ret.clone(); let waker = cx.waker().clone(); let _r = thread::spawn(move || { let r = f.unwrap()(); *setter.lock().unwrap() = Some(r); //println!("prepare to wake"); waker.wake(); }); std::task::Poll::Pending } } } } pub struct AsyncTcp(Arc
); impl AsyncTcp { pub fn new(addr: &str) -> Self { AsyncTcp(Arc::new(TcpStream::connect(addr).unwrap())) } pub async fn write(&self, buff: &[u8]) -> io::Result
{ let conn = self.0.clone(); let buff = buff.to_owned(); let task = AsyncThread::new(move || { let mut writer = BufWriter::new(conn.as_ref()); //println!("evaluate write"); let r = writer.write(&buff); //println!("write ok {r:?}"); r }); task.await } pub async fn read(&self) -> io::Result
> { let conn = self.0.clone(); let task = AsyncThread::new(move || { let mut reader = BufReader::new(conn.as_ref()); let mut vec = Vec::new(); vec.resize(1024 * 1024, b'\0'); match reader.read(&mut vec) { Ok(size) => { println!("read completely"); vec.resize(size, b'\0'); Ok(vec) } Err(e) => Err(e), } }); task.await } } #[tokio::main] async fn main() { // let mut conn = TcpStream::connect("127.0.0.1:8080").unwrap(); // let write_content = b"GET / HTTP/1.1\r\n\r\n"; // let mut vec = Vec::new(); // vec.resize(1024 * 1024, b'\0'); // conn.write(write_content).unwrap(); // let read_size = conn.read(&mut vec[..]).unwrap(); // println!("{read_size}"); let conn = AsyncTcp::new("127.0.0.1:8080"); let r = conn.write(b"GET / HTTP/1.1\r\n\r\n").await; println!("result: {r:?}"); let r1 = conn.read().await; println!("{r1:?}"); } ````