diff --git a/rust/asyncfnstore/Cargo.lock b/rust/asyncfnstore/Cargo.lock new file mode 100644 index 0000000..487d9dd --- /dev/null +++ b/rust/asyncfnstore/Cargo.lock @@ -0,0 +1,339 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "asyncfnstore" +version = "0.1.0" +dependencies = [ + "tokio", +] + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bitflags" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" + +[[package]] +name = "bytes" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "libc" +version = "0.2.169" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" + +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8402cab7aefae129c6977bb0ff1b8fd9a04eb5b51efc50a70bea51cda0c7924" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "proc-macro2" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "syn" +version = "2.0.96" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/rust/asyncfnstore/Cargo.toml b/rust/asyncfnstore/Cargo.toml new file mode 100644 index 0000000..d318017 --- /dev/null +++ b/rust/asyncfnstore/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "asyncfnstore" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1.43.0", features = ["full"] } diff --git a/rust/asyncfnstore/src/main.rs b/rust/asyncfnstore/src/main.rs new file mode 100644 index 0000000..e16ade3 --- /dev/null +++ b/rust/asyncfnstore/src/main.rs @@ -0,0 +1,55 @@ +//use futures::future::BoxFuture; +use std::future::Future; +use std::pin::Pin; + + +struct Data { + num: usize +} + +async fn bar(data: &Data) -> usize { + data.num + 666 +} + +async fn lorem(data: &Data) -> usize { + data.num + 777 +} + +type BoxFuture<'a, T> = Box + 'a>; +type CallbackDyn = dyn for<'a> Fn(&'a Data) -> BoxFuture<'a, usize>; + +struct Foo { + one: Box, + data: Data, +} + +impl Foo { + fn set(&mut self, f: F) + where F: for<'a> Fn(&'a Data) -> BoxFuture<'a, usize> + 'static { + self.one = Box::new(f); + } + + async fn process(mut self) { + let func = &self.one; + println!("Result: {}", Box::into_pin(func(&self.data)).await); + } +} + +#[tokio::main] +async fn main() { + let mut foo = Foo { + one: Box::new(|i| Box::new(bar(i))), + data: Data{num: 3}, + }; + //let mut foo = Foo::new(bar); + let offset = 5; + foo.set(move |d| { + Box::new(async move { + let i = lorem(d).await; + i + offset + }) + }); + + foo.process().await; + +} diff --git a/rust/tokiorace/src/main.rs b/rust/tokiorace/src/main.rs index f6ac397..7e78e32 100644 --- a/rust/tokiorace/src/main.rs +++ b/rust/tokiorace/src/main.rs @@ -1,18 +1,18 @@ use tokio::{ task, - time::{Duration, sleep}, + time::{Instant, Duration, sleep}, sync::{oneshot, mpsc::{Receiver, Sender, channel} } }; -use std::collections::{VecDeque, HashMap}; +use std::collections::{HashMap}; use anyhow::Result; use anyhow::{bail, Context}; -use tracing::{info, debug, instrument, Level}; -use tracing_subscriber::{fmt, Registry, EnvFilter}; +use tracing::{info, debug, instrument}; +use tracing_subscriber::{fmt, EnvFilter}; use tracing_subscriber::prelude::*; -use tracing_subscriber::fmt::format::FmtSpan; -use tokio::runtime::Handle; +use std::future::Future; + struct Service { t: Transport, @@ -24,10 +24,15 @@ struct Event { in_reply_to: Option, } -type CB = dyn (FnMut(Event, &mut Transport) -> Result<()>) + Send; +//type CB<'a> = Box Box>>>; + +//type CB<'a> = dyn Fn(Event, &mut Transport) -> BoxFuture<'a, Result<()>>; + +type BoxFuture<'a, T> = Box + 'a>; +type CallbackDyn = dyn for<'a> Fn(Event, &'a mut Transport) -> BoxFuture<'a, Result<()>>; enum Handler { - Callback(Box), + Callback(Box), Channel(oneshot::Sender) } @@ -35,7 +40,7 @@ struct Transport { msg_id: usize, outbox: Sender, inbox: Receiver>, - callbacks: HashMap, + callbacks: HashMap, } @@ -55,28 +60,45 @@ impl Transport { #[instrument(level="debug",skip(self),ret)] fn close(&mut self) -> Result<()> { self.inbox.close(); - Ok(()) + let i = Instant::now(); + self.callbacks.retain(|_, (_, t)| *t >= i); + if self.callbacks.is_empty() { + Ok(()) + } else { + bail!("some callbacks failed to launch.") + } } #[instrument(level="debug",skip(self),ret)] - async fn send(&mut self, e: Event) -> Result<()> { + async fn send(&self, e: Event) -> Result<()> { self.outbox.send(e).await.context("unable to send message") } + #[instrument(level="debug",skip(self),ret)] async fn rpc(&mut self, e: Event) -> Result { - let id = e.id; - self.send(e).await?; - let (tx, rx) = oneshot::channel(); - self.callbacks.insert(id, Handler::Channel(tx)); + let rx = self.rpc_chan(e).await?; let resp = rx.await?; Ok(resp) } - async fn rpc_callback Result<()> + Send + Sized + 'static>(&mut self, e: Event, cb: C) -> Result<()> { - self.callbacks.insert(e.id, Handler::Callback(Box::new(cb))); + #[instrument(level="debug",skip(self),ret)] + async fn rpc_chan(&mut self, e: Event) -> Result> { + let id = e.id; + self.send(e).await?; + let (tx, rx) = oneshot::channel(); + self.callbacks.insert(id, (Handler::Channel(tx), Instant::now() + Duration::from_millis(1000))); + Ok(rx) + } + + async fn rpc_callback(&mut self, e: Event, cb: C) -> Result<()> + where + C: for<'a> Fn(Event, &'a mut Transport) -> BoxFuture<'a, Result<()>> + 'static + { + self.callbacks.insert(e.id, (Handler::Callback(Box::new(cb)), (Instant::now() + Duration::from_millis(1000)))); self.send(e).await } + /// #Cancellation safety: This method is **not** cancellation safe, because it deals with callbacks #[instrument(level="debug",skip(self),ret)] async fn recv(&mut self) -> Result> { loop { @@ -89,16 +111,25 @@ impl Transport { let Some(original) = nxt.in_reply_to else { return Ok(Some(nxt)); }; - match self.callbacks.remove(&nxt.id) { + let sp = match self.callbacks.remove(&original) { None => { return Ok(Some(nxt)); }, - Some(Handler::Callback(mut cb)) => { - cb(nxt, self)?; + Some((h, t)) => { + if t < Instant::now() { + continue; + } + h + } + }; + match sp { + Handler::Callback(cb) => { + let f = Box::into_pin(cb(nxt, self)); + f.await?; }, - Some(Handler::Channel(c)) => { + Handler::Channel(c) => { if let Err(e) = c.send(nxt) { - bail!("could not send event to callback") + bail!("could not send event to callback {e:?}") } } } @@ -106,6 +137,17 @@ impl Transport { } } +macro_rules! callback { + (|$e:ident, $svc:ident| $blk:block) => { + |$e, $svc| { + Box::new(async move { + $blk + }) + } + } + +} + impl Service { #[instrument(level="debug", skip(self),ret)] async fn process(&mut self, event: Event) -> Result<()>{ @@ -117,16 +159,9 @@ impl Service { #[instrument(level="debug", skip(self),ret)] async fn process_nonblocking(&mut self, event: Event) -> Result<()>{ let resp = self.t.rpc_callback(Event{id: event.id, in_reply_to: None}, - |e, svc| { - let handle = Handle::current(); - task::block_in_place(move || { - handle.block_on( - async move { - svc.send(Event{id: 199, in_reply_to: Some(e.id)}).await - }) - }) - } - ).await?; + callback!(|e, svc| { + svc.send(Event{id: 199, in_reply_to: Some(e.id)}).await + })).await?; debug!("{:?}", &resp); Ok(()) } @@ -136,7 +171,7 @@ impl Service { while let Some(e) = self.t.recv().await? { self.process_nonblocking(e).await?; } - Ok::<(), anyhow::Error>(()) + self.t.close() } } @@ -148,9 +183,11 @@ async fn main() -> Result<()> { .with(EnvFilter::from_default_env()) .init(); + info!("Starting example server"); + let (t, tx, mut rx) = Transport::new(); - let mut serv = Service{t}; + let serv = Service{t}; let tx1 = tx.clone(); let s = tokio::spawn(async move { @@ -162,28 +199,29 @@ async fn main() -> Result<()> { }); debug!("Serving"); let r = tokio::spawn(async move { - for i in 0..5 { + for i in 0..2 { let Some(msg) = rx.recv().await else { break; }; if msg.in_reply_to.is_some() { continue; } - sleep(Duration::from_millis(1000)).await; - let reply = Event{id: 99, in_reply_to: Some(msg.id)}; - debug!("Sending reply: {:?}", &reply); - tx.send(Some(reply)).await.context("could not send reply")?; + sleep(Duration::from_millis(200)).await; + if i < 0 { + let reply = Event{id: 99, in_reply_to: Some(msg.id)}; + debug!("Sending reply: {:?}", &reply); + tx.send(Some(reply)).await.context("could not send reply")?; + } else { + println!("DONE WITH REPLIES"); + } } - println!("DONE WITH REPLIES"); - sleep(Duration::from_millis(5000)).await; - println!("REALLY DONE WITH REPLIES"); + sleep(Duration::from_millis(100)).await; tx.send(None).await.context("could not send closing message")?; Ok(()) }); serv.serve().await?; - debug!("Served"); - //serv.close(); + info!("Served"); s.await.context("error joining")??; r.await.context("error joining send")? }