mirror of
				https://github.com/balkian/experiments.git
				synced 2025-10-30 23:28:24 +00:00 
			
		
		
		
	tokiorace: improvements
This commit is contained in:
		| @@ -1,19 +1,18 @@ | |||||||
| use tokio::{ |  | ||||||
|     task, |  | ||||||
|     time::{Instant, Duration, sleep}, |  | ||||||
|     sync::{oneshot, |  | ||||||
|            mpsc::{Receiver, Sender, channel} |  | ||||||
|     } |  | ||||||
| }; |  | ||||||
| use std::collections::{HashMap}; |  | ||||||
| use anyhow::Result; | use anyhow::Result; | ||||||
| use anyhow::{bail, Context}; | use anyhow::{bail, Context}; | ||||||
| use tracing::{info, debug, instrument}; | use std::collections::HashMap; | ||||||
| use tracing_subscriber::{fmt, EnvFilter}; |  | ||||||
| use tracing_subscriber::prelude::*; |  | ||||||
| use std::future::Future; | use std::future::Future; | ||||||
|  | use tokio::{ | ||||||
|  |     sync::{ | ||||||
|  |         mpsc::{channel, Receiver, Sender}, | ||||||
|  |         oneshot, | ||||||
|  |     }, | ||||||
|  |     task, | ||||||
|  |     time::{sleep, timeout_at, Duration, Instant}, | ||||||
|  | }; | ||||||
|  | use tracing::{debug, error, warn, info, instrument}; | ||||||
|  | use tracing_subscriber::prelude::*; | ||||||
|  | use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter}; | ||||||
|  |  | ||||||
| /// Simplified version of a maelstrom event | /// Simplified version of a maelstrom event | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| @@ -22,130 +21,144 @@ struct Event { | |||||||
|     in_reply_to: Option<usize>, |     in_reply_to: Option<usize>, | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| type BoxFuture<'a, T> = Box<dyn Future<Output = T> + 'a>; |  | ||||||
| type CallbackDyn = dyn for<'a> Fn(Event, &'a mut Transport) -> BoxFuture<'a, Result<()>>; |  | ||||||
|  |  | ||||||
| enum Handler { |  | ||||||
|     Callback(Box<CallbackDyn>), |  | ||||||
|     Channel(oneshot::Sender<Event>) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Very simplified transport that receives from a channel and sends to another channel. In practice, | /// Very simplified transport that receives from a channel and sends to another channel. In practice, | ||||||
| /// this would be replaced with a transport that reads/writes events from/to files or a connection. | /// this would be replaced with a transport that reads/writes events from/to files or a connection. | ||||||
| struct Transport { | struct Transport { | ||||||
|  |     erx: Receiver<anyhow::Error>, | ||||||
|  |     etx: Sender<anyhow::Error>, | ||||||
|     msg_id: usize, |     msg_id: usize, | ||||||
|     outbox: Sender<Event>, |     outbox: Sender<Event>, | ||||||
|     inbox: Receiver<Option<Event>>, |     inbox: Receiver<Option<Event>>, | ||||||
|     callbacks: HashMap<usize, (Handler, Instant)>, |     callbacks: HashMap<usize, (oneshot::Sender<Option<Event>>, Instant)>, | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| impl Transport { | impl Transport { | ||||||
|     fn new() -> (Self, Sender<Option<Event>>, Receiver<Event>) { |     fn new() -> (Self, Sender<Option<Event>>, Receiver<Event>) { | ||||||
|         let (itx, irx) = channel::<Option<Event>>(1); |         let (itx, irx) = channel::<Option<Event>>(1); | ||||||
|         let (otx, orx) = channel::<Event>(1); |         let (otx, orx) = channel::<Event>(1); | ||||||
|         (Self { |         let (etx, erx) = channel(1); | ||||||
|             msg_id: 0, |         ( | ||||||
|             outbox: otx, |             Self { | ||||||
|             inbox: irx, |                 erx, | ||||||
|             callbacks: Default::default(), |                 etx, | ||||||
|         }, itx, orx) |                 msg_id: 0, | ||||||
|  |                 outbox: otx, | ||||||
|  |                 inbox: irx, | ||||||
|  |                 callbacks: Default::default(), | ||||||
|  |             }, | ||||||
|  |             itx, | ||||||
|  |             orx, | ||||||
|  |         ) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[instrument(level="debug",skip(self),ret)] |     #[instrument(level = "debug", skip(self))] | ||||||
|     fn close(&mut self) -> Result<()> { |     async fn close(mut self) -> Result<()> { | ||||||
|         self.inbox.close(); |         self.inbox.close(); | ||||||
|         let i = Instant::now(); |         let i = Instant::now(); | ||||||
|         self.callbacks.retain(|_, (_, t)| *t >= i); |         self.callbacks.retain(|_, (_, t)| *t >= i); | ||||||
|         if self.callbacks.is_empty() { |         drop(self.etx); | ||||||
|             Ok(()) |         debug!("Waiting for errors to propagate"); | ||||||
|         } else { |         for (c, _t) in self.callbacks.into_values() { | ||||||
|             bail!("some callbacks failed to launch.") |             let _ = c.send(None); | ||||||
|         } |         } | ||||||
|  |         if let Some(e) = self.erx.recv().await { | ||||||
|  |             return Err(e); | ||||||
|  |         } | ||||||
|  |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[instrument(level="debug",skip(self),ret)] |     #[instrument(level = "debug", skip(self))] | ||||||
|     async fn send(&self, e: Event) -> Result<()> { |     async fn send(&mut self, mut e: Event) -> Result<()> { | ||||||
|  |         e.id = self.msg_id; | ||||||
|  |         self.msg_id += 1; | ||||||
|         self.outbox.send(e).await.context("unable to send message") |         self.outbox.send(e).await.context("unable to send message") | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[instrument(level="debug",skip(self),ret)] |     #[instrument(level = "debug", skip(self))] | ||||||
|     async fn rpc(&mut self, e: Event) -> Result<Event> { |     async fn rpc_chan( | ||||||
|         let rx = self.rpc_chan(e).await?; |         &mut self, | ||||||
|         let resp = rx.await?; |         e: Event, | ||||||
|         Ok(resp) |         expiration: Instant, | ||||||
|     } |     ) -> Result<oneshot::Receiver<Option<Event>>> { | ||||||
|  |  | ||||||
|     #[instrument(level="debug",skip(self),ret)] |  | ||||||
|     async fn rpc_chan(&mut self, e: Event) -> Result<oneshot::Receiver<Event>> { |  | ||||||
|         let id = e.id; |         let id = e.id; | ||||||
|         self.send(e).await?; |         self.send(e).await?; | ||||||
|         let (tx, rx) = oneshot::channel(); |         let (tx, rx) = oneshot::channel(); | ||||||
|         self.callbacks.insert(id, (Handler::Channel(tx), Instant::now() + Duration::from_millis(1000))); |         self.callbacks.insert(id, (tx, expiration)); | ||||||
|         Ok(rx) |         Ok(rx) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     async fn rpc_callback<C>(&mut self, e: Event, cb: C) -> Result<()> |     async fn rpc_callback<C, F>(&mut self, e: Event, cb: C, expiration: Instant) -> Result<()> | ||||||
|         where |     where | ||||||
|         C: for<'a> Fn(Event, &'a mut Transport) -> BoxFuture<'a, Result<()>> + 'static |         C: 'static + Send + FnOnce(oneshot::Receiver<Option<Event>>, Sender<Event>) -> F, | ||||||
|  |         F: Future<Output = Result<()>> + Send + 'static, | ||||||
|     { |     { | ||||||
|         self.callbacks.insert(e.id, (Handler::Callback(Box::new(cb)), (Instant::now() + Duration::from_millis(1000)))); |         let rec = self.rpc_chan(e, expiration).await?; | ||||||
|         self.send(e).await |         let sender = self.outbox.clone(); | ||||||
|  |         let tx = self.etx.clone(); | ||||||
|  |         task::spawn(async move { | ||||||
|  |             if let Ok(Err(e)) = timeout_at(expiration, cb(rec, sender)).await { | ||||||
|  |                 error!("propagating error '{e:?}'"); | ||||||
|  |                 tx.send(e) | ||||||
|  |                     .await | ||||||
|  |                     .expect("could not propagate error to transport."); | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// #Cancellation safety: This method is **not** cancellation safe, because it deals with callbacks |     /// #Cancellation safety: This method is cancellation safe | ||||||
|     #[instrument(level="debug",skip(self),ret)] |     #[instrument(level = "debug", skip(self))] | ||||||
|     async fn recv(&mut self) -> Result<Option<Event>> { |     async fn recv(&mut self) -> Result<Option<Event>> { | ||||||
|         loop { |         loop { | ||||||
|             let Some(nxt) = self.inbox.recv().await else { |             tokio::select! { | ||||||
|                 return Ok(None); |                 rec = self.inbox.recv() => { | ||||||
|             }; |                     let Some(Some(nxt)) = rec else { | ||||||
|             let Some(nxt) = nxt else { |                         break; | ||||||
|                 return Ok(None); |                     }; | ||||||
|             }; |                     if let Some(e) = self.process_event(nxt).await? { | ||||||
|             let Some(original) = nxt.in_reply_to else { |                         return Ok(Some(e)); | ||||||
|                 return Ok(Some(nxt)); |  | ||||||
|             }; |  | ||||||
|             let sp = match self.callbacks.remove(&original) { |  | ||||||
|                 None => { |  | ||||||
|                     return Ok(Some(nxt)); |  | ||||||
|                 }, |  | ||||||
|                 Some((h, t)) => { |  | ||||||
|                     if t < Instant::now() { |  | ||||||
|                         continue; |  | ||||||
|                     } |                     } | ||||||
|                     h |  | ||||||
|                 } |  | ||||||
|             }; |  | ||||||
|             match sp { |  | ||||||
|                 Handler::Callback(cb) => { |  | ||||||
|                     let f = Box::into_pin(cb(nxt, self)); |  | ||||||
|                     f.await?; |  | ||||||
|                 }, |                 }, | ||||||
|                 Handler::Channel(c) => { |                 e = self.erx.recv() => { | ||||||
|                     if let Err(e) = c.send(nxt) { |                     error!("OHNOES, we have received an error"); | ||||||
|                         bail!("could not send event to callback {e:?}") |                     if let Some(e) = e { | ||||||
|  |                         return Err(e); | ||||||
|                     } |                     } | ||||||
|  |                     info!("no more errors"); | ||||||
|  |                     break; | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |         Ok(None) | ||||||
|     } |     } | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Simple macro to generate a dyn-safe closure from an async block. |     async fn process_event(&mut self, nxt: Event) -> Result<Option<Event>> { | ||||||
| macro_rules! callback { |         let Some(original) = nxt.in_reply_to else { | ||||||
|     (|$e:ident, $svc:ident| $blk:block) => { |             return Ok(Some(nxt)); | ||||||
|         |$e, $svc| { |         }; | ||||||
|             Box::new(async move { |         if let Some((c, t)) = self.callbacks.remove(&original) { | ||||||
|                 $blk |             if t >= Instant::now() { | ||||||
|             }) |                 if let Err(e) = c.send(Some(nxt)) { | ||||||
|  |                     bail!("could not send event to callback {e:?}") | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|  |         Ok(None) | ||||||
|     } |     } | ||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
|  | ///// Simple macro to generate a dyn-safe closure from an async block. | ||||||
|  | //macro_rules! callback { | ||||||
|  | //    (|$e:ident, $svc:ident| $blk:block) => { | ||||||
|  | //        move |$e, $svc| { | ||||||
|  | //            async move { | ||||||
|  | //                $blk | ||||||
|  | //            } | ||||||
|  | //        } | ||||||
|  | //    } | ||||||
|  | // | ||||||
|  | //} | ||||||
|  |  | ||||||
| struct Service { | struct Service { | ||||||
|     t: Transport, |     t: Transport, | ||||||
| } | } | ||||||
| @@ -153,77 +166,120 @@ struct Service { | |||||||
| /// All-in-one implementation of a service. In practice, this would be a trait where the `serve` | /// All-in-one implementation of a service. In practice, this would be a trait where the `serve` | ||||||
| /// part is kind of generic, and the process varies between implementers. | /// part is kind of generic, and the process varies between implementers. | ||||||
| impl Service { | impl Service { | ||||||
|     #[instrument(level="debug", skip(self),ret)] |     fn new(t: Transport) -> Self { | ||||||
|     async fn process(&mut self, event: Event) -> Result<()>{ |         Self { t } | ||||||
|         let resp = self.t.rpc(Event{id: event.id, in_reply_to: None}).await?; |     } | ||||||
|  |  | ||||||
|  |     #[instrument(level = "debug", skip(self))] | ||||||
|  |     async fn process(&mut self, event: Event) -> Result<()> { | ||||||
|  |         let eid = event.id; | ||||||
|  |         info!("Service received event"); | ||||||
|  |         let resp = self | ||||||
|  |             .t | ||||||
|  |             .rpc_callback( | ||||||
|  |                 Event { | ||||||
|  |                     id: event.id, | ||||||
|  |                     in_reply_to: None, | ||||||
|  |                 }, | ||||||
|  |                 move |chan: oneshot::Receiver<Option<Event>>, out: Sender<Event>| { | ||||||
|  |                     async move { | ||||||
|  |                         let Some(e) = chan.await? else { | ||||||
|  |                             warn!("Reply not received for event {eid}"); | ||||||
|  |                             return Ok(()); | ||||||
|  |                         }; | ||||||
|  |                         info!("Received reply {e:?}"); | ||||||
|  |                         out.send(Event { | ||||||
|  |                             id: 199, | ||||||
|  |                             in_reply_to: Some(e.id), | ||||||
|  |                         }) | ||||||
|  |                         .await | ||||||
|  |                         .context("could not send reply.")?; | ||||||
|  |                         //bail!("forcing an error here!"); | ||||||
|  |                         Ok(()) | ||||||
|  |                     } | ||||||
|  |                 }, | ||||||
|  |                 Instant::now() + Duration::from_millis(100000), | ||||||
|  |             ) | ||||||
|  |             .await?; | ||||||
|         debug!("{:?}", &resp); |         debug!("{:?}", &resp); | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[instrument(level="debug", skip(self),ret)] |     #[instrument(level = "debug", skip(self))] | ||||||
|     async fn process_nonblocking(&mut self, event: Event) -> Result<()>{ |     async fn serve(mut self) -> Result<()> { | ||||||
|         let resp = self.t.rpc_callback(Event{id: event.id, in_reply_to: None}, |  | ||||||
|                                        callback!(|e, svc| { |  | ||||||
|                                                svc.send(Event{id: 199, in_reply_to: Some(e.id)}).await |  | ||||||
|                                            })).await?; |  | ||||||
|         debug!("{:?}", &resp); |  | ||||||
|         Ok(()) |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[instrument(level="debug", skip(self),ret)] |  | ||||||
|     async fn serve(mut self) -> Result<()>{ |  | ||||||
|         while let Some(e) = self.t.recv().await? { |         while let Some(e) = self.t.recv().await? { | ||||||
|             self.process_nonblocking(e).await?; |             self.process(e).await?; | ||||||
|         } |         } | ||||||
|         self.t.close() |         self.t.close().await | ||||||
|     } |     } | ||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| async fn main() -> Result<()> { | async fn main() -> Result<()> { | ||||||
|  |     let filter = EnvFilter::builder() | ||||||
|  |         .with_default_directive(LevelFilter::INFO.into()) | ||||||
|  |         .from_env_lossy(); | ||||||
|     tracing_subscriber::registry() |     tracing_subscriber::registry() | ||||||
|     .with(fmt::layer()) |         .with(fmt::layer()) | ||||||
|     .with(EnvFilter::from_default_env()) |         .with(filter) | ||||||
|     .init(); |         .init(); | ||||||
|  |  | ||||||
|     info!("Starting example server"); |     info!("Starting example server"); | ||||||
|  |  | ||||||
|     let (t, tx, mut rx) = Transport::new(); |     let (t, tx, mut rx) = Transport::new(); | ||||||
|  |  | ||||||
|     let serv = Service{t}; |     let serv = Service::new(t); | ||||||
|  |  | ||||||
|     let tx1 = tx.clone(); |     let tx1 = tx.clone(); | ||||||
|     let s = tokio::spawn(async move { |     let s = tokio::spawn(async move { | ||||||
|         for i in 0..2 { |         for i in 0..5 { | ||||||
|             debug!("Sending event {i}"); |             debug!("Main sending event {i}"); | ||||||
|             tx1.send(Some(Event{id: i, in_reply_to: None})).await.context("could not send original message")?; |             tx1.send(Some(Event { | ||||||
|  |                 id: i, | ||||||
|  |                 in_reply_to: None, | ||||||
|  |             })) | ||||||
|  |             .await | ||||||
|  |             .context("could not send original message")?; | ||||||
|  |             debug!("Main sent event {i}"); | ||||||
|         } |         } | ||||||
|  |         debug!("Main done sending events"); | ||||||
|         Ok::<(), anyhow::Error>(()) |         Ok::<(), anyhow::Error>(()) | ||||||
|     }); |     }); | ||||||
|     debug!("Serving"); |     debug!("Serving"); | ||||||
|     let r = tokio::spawn(async move { |     let r = tokio::spawn(async move { | ||||||
|         for i in 0..2 { |         info!("Spawning receptor"); | ||||||
|  |         for _i in 0..3 { | ||||||
|             let Some(msg) = rx.recv().await else { |             let Some(msg) = rx.recv().await else { | ||||||
|                 break; |                 break; | ||||||
|             }; |             }; | ||||||
|             if msg.in_reply_to.is_some() { |             if msg.in_reply_to.is_some() { | ||||||
|  |                 info!("Main received response: {msg:?}"); | ||||||
|                 continue; |                 continue; | ||||||
|             } |             } | ||||||
|             sleep(Duration::from_millis(200)).await; |             info!("Main received request: {msg:?}"); | ||||||
|             if i < 0 { |             sleep(Duration::from_millis(10)).await; | ||||||
|                 let reply = Event{id: 99, in_reply_to: Some(msg.id)}; |             let reply = Event { | ||||||
|                 debug!("Sending reply: {:?}", &reply); |                 id: 99, | ||||||
|                 tx.send(Some(reply)).await.context("could not send reply")?; |                 in_reply_to: Some(msg.id), | ||||||
|             } else { |             }; | ||||||
|                 println!("DONE WITH REPLIES"); |             debug!("Sending reply: {:?}", &reply); | ||||||
|             } |             let tx = tx.clone(); | ||||||
|  |             tokio::spawn(async move { | ||||||
|  |                 tx.send(Some(reply)).await.expect("could not send reply"); | ||||||
|  |             }); | ||||||
|         } |         } | ||||||
|         sleep(Duration::from_millis(100)).await; |         info!("Not sending more replies. Logging outgoing messages."); | ||||||
|         tx.send(None).await.context("could not send closing message")?; |         drop(tx); | ||||||
|  |         sleep(Duration::from_millis(1000)).await; | ||||||
|  |         while let Some(msg) = rx.recv().await { | ||||||
|  |             info!("Main received: {msg:?}"); | ||||||
|  |         }; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     }); |     }); | ||||||
|     serv.serve().await?; |     if let Err(e) = serv.serve().await { | ||||||
|  |         error!("Have received the error: {e:?}"); | ||||||
|  |         return Ok(()); | ||||||
|  |     }; | ||||||
|  |  | ||||||
|     info!("Served"); |     info!("Served"); | ||||||
|     s.await.context("error joining")??; |     s.await.context("error joining")??; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user