mirror of
https://github.com/balkian/experiments.git
synced 2025-02-12 13:08:17 +00:00
Compare commits
3 Commits
8e7ed006e7
...
b2bc62e83d
Author | SHA1 | Date | |
---|---|---|---|
|
b2bc62e83d | ||
|
6b68928eab | ||
|
e227e387f4 |
rust
@ -1,225 +1,285 @@
|
||||
use tokio::{
|
||||
task,
|
||||
time::{Instant, Duration, sleep},
|
||||
sync::{oneshot,
|
||||
mpsc::{Receiver, Sender, channel}
|
||||
}
|
||||
};
|
||||
use std::collections::{HashMap};
|
||||
use anyhow::Result;
|
||||
use anyhow::{bail, Context};
|
||||
use tracing::{info, debug, instrument};
|
||||
use tracing_subscriber::{fmt, EnvFilter};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use tokio::{
|
||||
sync::{
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
oneshot,
|
||||
},
|
||||
task,
|
||||
time::{sleep, timeout_at, Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, error, warn, info, instrument};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter};
|
||||
|
||||
|
||||
struct Service {
|
||||
t: Transport,
|
||||
}
|
||||
|
||||
/// Simplified version of a maelstrom event
|
||||
#[derive(Debug)]
|
||||
struct Event {
|
||||
id: usize,
|
||||
in_reply_to: Option<usize>,
|
||||
}
|
||||
|
||||
//type CB<'a> = Box<dyn Fn(Event, &'a mut Transport) -> Box<dyn Future<Output = Result<()>>>>;
|
||||
|
||||
//type CB<'a> = dyn Fn(Event, &mut Transport) -> BoxFuture<'a, Result<()>>;
|
||||
|
||||
type BoxFuture<'a, T> = Box<dyn Future<Output = T> + 'a>;
|
||||
type CallbackDyn = dyn for<'a> Fn(Event, &'a mut Transport) -> BoxFuture<'a, Result<()>>;
|
||||
|
||||
enum Handler {
|
||||
Callback(Box<CallbackDyn>),
|
||||
Channel(oneshot::Sender<Event>)
|
||||
}
|
||||
|
||||
/// Very simplified transport that receives from a channel and sends to another channel. In practice,
|
||||
/// this would be replaced with a transport that reads/writes events from/to files or a connection.
|
||||
struct Transport {
|
||||
erx: Receiver<anyhow::Error>,
|
||||
etx: Sender<anyhow::Error>,
|
||||
msg_id: usize,
|
||||
outbox: Sender<Event>,
|
||||
inbox: Receiver<Option<Event>>,
|
||||
callbacks: HashMap<usize, (Handler, Instant)>,
|
||||
callbacks: HashMap<usize, (oneshot::Sender<Option<Event>>, Instant)>,
|
||||
}
|
||||
|
||||
|
||||
impl Transport {
|
||||
fn new() -> (Self, Sender<Option<Event>>, Receiver<Event>) {
|
||||
let (itx, irx) = channel::<Option<Event>>(1);
|
||||
let (otx, orx) = channel::<Event>(1);
|
||||
(Self {
|
||||
msg_id: 0,
|
||||
outbox: otx,
|
||||
inbox: irx,
|
||||
callbacks: Default::default(),
|
||||
}, itx, orx)
|
||||
|
||||
let (etx, erx) = channel(1);
|
||||
(
|
||||
Self {
|
||||
erx,
|
||||
etx,
|
||||
msg_id: 0,
|
||||
outbox: otx,
|
||||
inbox: irx,
|
||||
callbacks: Default::default(),
|
||||
},
|
||||
itx,
|
||||
orx,
|
||||
)
|
||||
}
|
||||
|
||||
#[instrument(level="debug",skip(self),ret)]
|
||||
fn close(&mut self) -> Result<()> {
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn close(mut self) -> Result<()> {
|
||||
self.inbox.close();
|
||||
let i = Instant::now();
|
||||
self.callbacks.retain(|_, (_, t)| *t >= i);
|
||||
if self.callbacks.is_empty() {
|
||||
Ok(())
|
||||
} else {
|
||||
bail!("some callbacks failed to launch.")
|
||||
drop(self.etx);
|
||||
debug!("Waiting for errors to propagate");
|
||||
for (c, _t) in self.callbacks.into_values() {
|
||||
let _ = c.send(None);
|
||||
}
|
||||
if let Some(e) = self.erx.recv().await {
|
||||
return Err(e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level="debug",skip(self),ret)]
|
||||
async fn send(&self, e: Event) -> Result<()> {
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn send(&mut self, mut e: Event) -> Result<()> {
|
||||
e.id = self.msg_id;
|
||||
self.msg_id += 1;
|
||||
self.outbox.send(e).await.context("unable to send message")
|
||||
}
|
||||
|
||||
#[instrument(level="debug",skip(self),ret)]
|
||||
async fn rpc(&mut self, e: Event) -> Result<Event> {
|
||||
let rx = self.rpc_chan(e).await?;
|
||||
let resp = rx.await?;
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
#[instrument(level="debug",skip(self),ret)]
|
||||
async fn rpc_chan(&mut self, e: Event) -> Result<oneshot::Receiver<Event>> {
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn rpc_chan(
|
||||
&mut self,
|
||||
e: Event,
|
||||
expiration: Instant,
|
||||
) -> Result<oneshot::Receiver<Option<Event>>> {
|
||||
let id = e.id;
|
||||
self.send(e).await?;
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.callbacks.insert(id, (Handler::Channel(tx), Instant::now() + Duration::from_millis(1000)));
|
||||
self.callbacks.insert(id, (tx, expiration));
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
async fn rpc_callback<C>(&mut self, e: Event, cb: C) -> Result<()>
|
||||
where
|
||||
C: for<'a> Fn(Event, &'a mut Transport) -> BoxFuture<'a, Result<()>> + 'static
|
||||
async fn rpc_callback<C, F>(&mut self, e: Event, cb: C, expiration: Instant) -> Result<()>
|
||||
where
|
||||
C: 'static + Send + FnOnce(oneshot::Receiver<Option<Event>>, Sender<Event>) -> F,
|
||||
F: Future<Output = Result<()>> + Send + 'static,
|
||||
{
|
||||
self.callbacks.insert(e.id, (Handler::Callback(Box::new(cb)), (Instant::now() + Duration::from_millis(1000))));
|
||||
self.send(e).await
|
||||
let rec = self.rpc_chan(e, expiration).await?;
|
||||
let sender = self.outbox.clone();
|
||||
let tx = self.etx.clone();
|
||||
task::spawn(async move {
|
||||
if let Ok(Err(e)) = timeout_at(expiration, cb(rec, sender)).await {
|
||||
error!("propagating error '{e:?}'");
|
||||
tx.send(e)
|
||||
.await
|
||||
.expect("could not propagate error to transport.");
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// #Cancellation safety: This method is **not** cancellation safe, because it deals with callbacks
|
||||
#[instrument(level="debug",skip(self),ret)]
|
||||
/// #Cancellation safety: This method is cancellation safe
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn recv(&mut self) -> Result<Option<Event>> {
|
||||
loop {
|
||||
let Some(nxt) = self.inbox.recv().await else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(nxt) = nxt else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(original) = nxt.in_reply_to else {
|
||||
return Ok(Some(nxt));
|
||||
};
|
||||
let sp = match self.callbacks.remove(&original) {
|
||||
None => {
|
||||
return Ok(Some(nxt));
|
||||
},
|
||||
Some((h, t)) => {
|
||||
if t < Instant::now() {
|
||||
continue;
|
||||
tokio::select! {
|
||||
rec = self.inbox.recv() => {
|
||||
let Some(Some(nxt)) = rec else {
|
||||
break;
|
||||
};
|
||||
if let Some(e) = self.process_event(nxt).await? {
|
||||
return Ok(Some(e));
|
||||
}
|
||||
h
|
||||
}
|
||||
};
|
||||
match sp {
|
||||
Handler::Callback(cb) => {
|
||||
let f = Box::into_pin(cb(nxt, self));
|
||||
f.await?;
|
||||
},
|
||||
Handler::Channel(c) => {
|
||||
if let Err(e) = c.send(nxt) {
|
||||
bail!("could not send event to callback {e:?}")
|
||||
e = self.erx.recv() => {
|
||||
error!("OHNOES, we have received an error");
|
||||
if let Some(e) = e {
|
||||
return Err(e);
|
||||
}
|
||||
info!("no more errors");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! callback {
|
||||
(|$e:ident, $svc:ident| $blk:block) => {
|
||||
|$e, $svc| {
|
||||
Box::new(async move {
|
||||
$blk
|
||||
})
|
||||
async fn process_event(&mut self, nxt: Event) -> Result<Option<Event>> {
|
||||
let Some(original) = nxt.in_reply_to else {
|
||||
return Ok(Some(nxt));
|
||||
};
|
||||
if let Some((c, t)) = self.callbacks.remove(&original) {
|
||||
if t >= Instant::now() {
|
||||
if let Err(e) = c.send(Some(nxt)) {
|
||||
bail!("could not send event to callback {e:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
///// Simple macro to generate a dyn-safe closure from an async block.
|
||||
//macro_rules! callback {
|
||||
// (|$e:ident, $svc:ident| $blk:block) => {
|
||||
// move |$e, $svc| {
|
||||
// async move {
|
||||
// $blk
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
//}
|
||||
|
||||
struct Service {
|
||||
t: Transport,
|
||||
}
|
||||
|
||||
/// All-in-one implementation of a service. In practice, this would be a trait where the `serve`
|
||||
/// part is kind of generic, and the process varies between implementers.
|
||||
impl Service {
|
||||
#[instrument(level="debug", skip(self),ret)]
|
||||
async fn process(&mut self, event: Event) -> Result<()>{
|
||||
let resp = self.t.rpc(Event{id: event.id, in_reply_to: None}).await?;
|
||||
fn new(t: Transport) -> Self {
|
||||
Self { t }
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn process(&mut self, event: Event) -> Result<()> {
|
||||
let eid = event.id;
|
||||
info!("Service received event");
|
||||
let resp = self
|
||||
.t
|
||||
.rpc_callback(
|
||||
Event {
|
||||
id: event.id,
|
||||
in_reply_to: None,
|
||||
},
|
||||
move |chan: oneshot::Receiver<Option<Event>>, out: Sender<Event>| {
|
||||
async move {
|
||||
let Some(e) = chan.await? else {
|
||||
warn!("Reply not received for event {eid}");
|
||||
return Ok(());
|
||||
};
|
||||
info!("Received reply {e:?}");
|
||||
out.send(Event {
|
||||
id: 199,
|
||||
in_reply_to: Some(e.id),
|
||||
})
|
||||
.await
|
||||
.context("could not send reply.")?;
|
||||
//bail!("forcing an error here!");
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
Instant::now() + Duration::from_millis(100000),
|
||||
)
|
||||
.await?;
|
||||
debug!("{:?}", &resp);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level="debug", skip(self),ret)]
|
||||
async fn process_nonblocking(&mut self, event: Event) -> Result<()>{
|
||||
let resp = self.t.rpc_callback(Event{id: event.id, in_reply_to: None},
|
||||
callback!(|e, svc| {
|
||||
svc.send(Event{id: 199, in_reply_to: Some(e.id)}).await
|
||||
})).await?;
|
||||
debug!("{:?}", &resp);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level="debug", skip(self),ret)]
|
||||
async fn serve(mut self) -> Result<()>{
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn serve(mut self) -> Result<()> {
|
||||
while let Some(e) = self.t.recv().await? {
|
||||
self.process_nonblocking(e).await?;
|
||||
self.process(e).await?;
|
||||
}
|
||||
self.t.close()
|
||||
self.t.close().await
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let filter = EnvFilter::builder()
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy();
|
||||
tracing_subscriber::registry()
|
||||
.with(fmt::layer())
|
||||
.with(EnvFilter::from_default_env())
|
||||
.init();
|
||||
.with(fmt::layer())
|
||||
.with(filter)
|
||||
.init();
|
||||
|
||||
info!("Starting example server");
|
||||
|
||||
let (t, tx, mut rx) = Transport::new();
|
||||
|
||||
let serv = Service{t};
|
||||
let serv = Service::new(t);
|
||||
|
||||
let tx1 = tx.clone();
|
||||
let s = tokio::spawn(async move {
|
||||
for i in 0..2 {
|
||||
debug!("Sending event {i}");
|
||||
tx1.send(Some(Event{id: i, in_reply_to: None})).await.context("could not send original message")?;
|
||||
for i in 0..5 {
|
||||
debug!("Main sending event {i}");
|
||||
tx1.send(Some(Event {
|
||||
id: i,
|
||||
in_reply_to: None,
|
||||
}))
|
||||
.await
|
||||
.context("could not send original message")?;
|
||||
debug!("Main sent event {i}");
|
||||
}
|
||||
debug!("Main done sending events");
|
||||
Ok::<(), anyhow::Error>(())
|
||||
});
|
||||
debug!("Serving");
|
||||
let r = tokio::spawn(async move {
|
||||
for i in 0..2 {
|
||||
info!("Spawning receptor");
|
||||
for _i in 0..3 {
|
||||
let Some(msg) = rx.recv().await else {
|
||||
break;
|
||||
};
|
||||
if msg.in_reply_to.is_some() {
|
||||
info!("Main received response: {msg:?}");
|
||||
continue;
|
||||
}
|
||||
sleep(Duration::from_millis(200)).await;
|
||||
if i < 0 {
|
||||
let reply = Event{id: 99, in_reply_to: Some(msg.id)};
|
||||
debug!("Sending reply: {:?}", &reply);
|
||||
tx.send(Some(reply)).await.context("could not send reply")?;
|
||||
} else {
|
||||
println!("DONE WITH REPLIES");
|
||||
}
|
||||
info!("Main received request: {msg:?}");
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
let reply = Event {
|
||||
id: 99,
|
||||
in_reply_to: Some(msg.id),
|
||||
};
|
||||
debug!("Sending reply: {:?}", &reply);
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
tx.send(Some(reply)).await.expect("could not send reply");
|
||||
});
|
||||
}
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
tx.send(None).await.context("could not send closing message")?;
|
||||
info!("Not sending more replies. Logging outgoing messages.");
|
||||
drop(tx);
|
||||
sleep(Duration::from_millis(1000)).await;
|
||||
while let Some(msg) = rx.recv().await {
|
||||
info!("Main received: {msg:?}");
|
||||
};
|
||||
Ok(())
|
||||
});
|
||||
serv.serve().await?;
|
||||
if let Err(e) = serv.serve().await {
|
||||
error!("Have received the error: {e:?}");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
info!("Served");
|
||||
s.await.context("error joining")??;
|
||||
|
598
rust/tokioserde/Cargo.lock
generated
Normal file
598
rust/tokioserde/Cargo.lock
generated
Normal file
@ -0,0 +1,598 @@
|
||||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.24.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
|
||||
dependencies = [
|
||||
"gimli",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "adler2"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
|
||||
|
||||
[[package]]
|
||||
name = "aho-corasick"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.95"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04"
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.74"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
"object",
|
||||
"rustc-demangle",
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "2.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.31.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
|
||||
|
||||
[[package]]
|
||||
name = "lazy_static"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.169"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
|
||||
dependencies = [
|
||||
"regex-automata 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b8402cab7aefae129c6977bb0ff1b8fd9a04eb5b51efc50a70bea51cda0c7924"
|
||||
dependencies = [
|
||||
"adler2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"wasi",
|
||||
"windows-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.36.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.20.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.9.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"smallvec",
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.93"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99"
|
||||
dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.38"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-automata 0.4.9",
|
||||
"regex-syntax 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||
dependencies = [
|
||||
"regex-syntax 0.6.29",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-syntax 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.6.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-demangle"
|
||||
version = "0.1.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.217"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.217"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.137"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "930cfb6e6abf99298aaad7d29abbef7a9999a9a8806a40088f55f0dcec03146b"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"memchr",
|
||||
"ryu",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sharded-slab"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.13.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.5.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.96"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thread_local"
|
||||
version = "1.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.43.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"parking_lot",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"windows-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-macros"
|
||||
version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokioserde"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.41"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
|
||||
dependencies = [
|
||||
"pin-project-lite",
|
||||
"tracing-attributes",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.33"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"valuable",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
|
||||
dependencies = [
|
||||
"log",
|
||||
"once_cell",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.3.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
|
||||
|
||||
[[package]]
|
||||
name = "valuable"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.11.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.3.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
|
||||
dependencies = [
|
||||
"winapi-i686-pc-windows-gnu",
|
||||
"winapi-x86_64-pc-windows-gnu",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi-i686-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-x86_64-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
|
||||
dependencies = [
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-targets"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
|
||||
dependencies = [
|
||||
"windows_aarch64_gnullvm",
|
||||
"windows_aarch64_msvc",
|
||||
"windows_i686_gnu",
|
||||
"windows_i686_gnullvm",
|
||||
"windows_i686_msvc",
|
||||
"windows_x86_64_gnu",
|
||||
"windows_x86_64_gnullvm",
|
||||
"windows_x86_64_msvc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnullvm"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.52.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
|
12
rust/tokioserde/Cargo.toml
Normal file
12
rust/tokioserde/Cargo.toml
Normal file
@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "tokioserde"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.95"
|
||||
serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = "1.0.137"
|
||||
tokio = { version = "1.43.0", features = ["full"] }
|
||||
tracing = { version = "0.1.41", features = ["attributes"] }
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
|
204
rust/tokioserde/src/main.rs
Normal file
204
rust/tokioserde/src/main.rs
Normal file
@ -0,0 +1,204 @@
|
||||
/// A prototype design for the implementation of Maelstrom services.
|
||||
/// It consists of the following main elements:
|
||||
/// * Msg<P> - For messages with generic payload P
|
||||
/// * Transport Trait - A transport that can send and receive Msg<P>. The basic implementation is
|
||||
/// NewlineTransport.
|
||||
/// * MsgSender Trait - capable of sending Msg<P>. Transport should generate new senders through
|
||||
/// `clone_sender`.
|
||||
///
|
||||
use tokio::{
|
||||
time::{sleep, Duration},
|
||||
task::JoinHandle,
|
||||
sync::{oneshot, mpsc::{channel, Sender}}
|
||||
};
|
||||
use serde::{Serialize,Deserialize, de::DeserializeOwned};
|
||||
use serde_json::Value;
|
||||
use std::fmt::Debug;
|
||||
use tokio::io::{self, AsyncRead, BufReader, AsyncWrite, AsyncWriteExt, AsyncBufReadExt, Lines, BufWriter};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use std::future::Future;
|
||||
use std::pin::{pin};
|
||||
use tracing::{instrument, info, debug, warn};
|
||||
use tracing_subscriber::{filter::EnvFilter};
|
||||
|
||||
|
||||
#[derive(Debug,Deserialize,Serialize)]
|
||||
struct Message {
|
||||
body: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct Msg<P> {
|
||||
src: u64,
|
||||
dest: u64,
|
||||
body: Body<P>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct Body<P>{
|
||||
msg_id: Option<u64>,
|
||||
in_reply_to: Option<u64>,
|
||||
#[serde(flatten)]
|
||||
payload: P
|
||||
}
|
||||
|
||||
impl<P> Msg<P>
|
||||
where P: Serialize
|
||||
{
|
||||
fn to_generic(&self) -> Result<Msg<Value>> {
|
||||
Ok(Msg {
|
||||
src: self.src.clone(),
|
||||
dest: self.dest.clone(),
|
||||
body: Body {
|
||||
msg_id: self.body.msg_id.clone(),
|
||||
in_reply_to: self.body.in_reply_to.clone(),
|
||||
payload: serde_json::to_value(&self.body.payload)?
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
struct NewlineTransport<R> {
|
||||
input: Lines<BufReader<R>>,
|
||||
jh: JoinHandle<Result<()>>,
|
||||
output: AckSender,
|
||||
}
|
||||
|
||||
impl NewlineTransport<io::Stdin>
|
||||
{
|
||||
fn new() -> Self {
|
||||
Self::from_rw(io::stdin(), io::stdout())
|
||||
}
|
||||
}
|
||||
|
||||
type AckSender = Sender<(Msg<Value>, oneshot::Sender<Result<usize>>)>;
|
||||
|
||||
impl<R> NewlineTransport<R>
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
fn from_rw<W: AsyncWrite + Send + 'static>(r: R, w: W) -> Self {
|
||||
let buf = io::BufReader::new(r);
|
||||
let (output, mut rx) = channel::<(Msg<Value>, oneshot::Sender<Result<usize>>)>(1);
|
||||
let jh = tokio::spawn(async move {
|
||||
let mut out = pin!(BufWriter::new(w));
|
||||
let mut msg_id = 0; // This msg_id could be added to the Transport with an Arc or
|
||||
// similar.
|
||||
while let Some((mut gen, back)) = rx.recv().await {
|
||||
gen.body.msg_id = Some(msg_id);
|
||||
msg_id += 1;
|
||||
let s = serde_json::to_string(&gen).context("failed to serialize message")?;
|
||||
let res = out.write(s.as_bytes()).await.context("failed to write");
|
||||
out.flush().await?;
|
||||
if let Err(res) = back.send(res) {
|
||||
bail!("failed to send result of sending: {res:?}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
Self {
|
||||
input: buf.lines(),
|
||||
jh,
|
||||
output,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
trait Transport {
|
||||
fn clone_sender(&self) -> impl MsgSender + 'static;
|
||||
|
||||
fn close(self) -> impl Future<Output=Result<()>>;
|
||||
|
||||
fn recv<P: Debug + DeserializeOwned>(&mut self) -> impl Future<Output=Result<Option<Msg<P>>>>;
|
||||
}
|
||||
|
||||
impl<R> Transport for NewlineTransport<R>
|
||||
where
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
fn clone_sender(&self) -> impl MsgSender + 'static {
|
||||
self.output.clone()
|
||||
}
|
||||
|
||||
async fn close(self) -> Result<()>{
|
||||
drop(self.output);
|
||||
self.jh.await?
|
||||
}
|
||||
|
||||
#[instrument(skip(self),ret)]
|
||||
async fn recv<P: Debug + DeserializeOwned>(&mut self) -> Result<Option<Msg<P>>> {
|
||||
let Some(line) = self.input.next_line().await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
Ok(Some(serde_json::from_str::<Msg<P>>(&line)?))
|
||||
}
|
||||
}
|
||||
|
||||
trait MsgSender: Send {
|
||||
fn send_msg<P: Serialize + Debug + Sync>(&mut self, m: &Msg<P>) -> impl Future<Output=Result<usize>> + Send;
|
||||
}
|
||||
|
||||
|
||||
impl MsgSender for AckSender {
|
||||
#[instrument(skip(self),ret)]
|
||||
async fn send_msg<P: Serialize + Debug + Sync>(&mut self, m: &Msg<P>) -> Result<usize> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let generic = m.to_generic()?;
|
||||
Sender::send(self, (generic, tx)).await.context("could not send message to sender")?;
|
||||
rx.await.context("could not send ack")?
|
||||
}
|
||||
}
|
||||
|
||||
trait Service {
|
||||
type P: Debug + DeserializeOwned + Serialize;
|
||||
fn process(&mut self, m: Msg<Self::P>, s: impl MsgSender + 'static) -> impl Future<Output=Result<()>>;
|
||||
fn serve<T: Transport>(mut self, mut t: T) -> impl Future<Output=Result<()>>
|
||||
where Self: Sized{
|
||||
async move {
|
||||
while let Some(m) = t.recv::<Self::P>().await? {
|
||||
let mut s = t.clone_sender();
|
||||
self.process(m, s).await?;
|
||||
}
|
||||
info!("Closing transport");
|
||||
t.close().await.expect("should not panic");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
struct EchoService {}
|
||||
|
||||
#[derive(Debug,Deserialize,Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
enum EchoPayload {
|
||||
Echo {
|
||||
echo: String
|
||||
},
|
||||
}
|
||||
|
||||
impl Service for EchoService {
|
||||
type P = EchoPayload;
|
||||
async fn process(&mut self, m: Msg<Self::P>, mut s: impl MsgSender + 'static) -> Result<()> {
|
||||
tracing::info!("Received: {m:?}");
|
||||
tokio::spawn(async move {
|
||||
sleep(Duration::from_millis(5000)).await;
|
||||
s.send_msg(&Msg{src: 1, dest: 1, body: { Body { msg_id: Some(1), in_reply_to: None, payload: EchoPayload::Echo{ echo: String::from("Hello") }}}}).await.expect("sending failed");
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()>{
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.with_writer(std::io::stderr)
|
||||
.init();
|
||||
let mut t = NewlineTransport::new();
|
||||
let s = EchoService{};
|
||||
s.serve(t).await
|
||||
}
|
Loading…
Reference in New Issue
Block a user