@ -1,225 +1,285 @@
use tokio::{
time::{Instant, Duration, sleep},
mpsc::{Receiver, Sender, channel}
use std::collections::{HashMap};
use anyhow::Result;
use anyhow::{bail, Context};
use tracing::{info, debug, instrument};
use tracing_subscriber::{fmt, EnvFilter};
use tracing_subscriber::prelude::*;
use std::collections::HashMap;
use std::future::Future;
use tokio::{
mpsc::{channel, Receiver, Sender},
time::{sleep, timeout_at, Duration, Instant},
use tracing::{debug, error, warn, info, instrument};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter};
struct Service {
t: Transport,
/// Simplified version of a maelstrom event
struct Event {
id: usize,
in_reply_to: Option<usize>,
//type CB<'a> = Box<dyn Fn(Event, &'a mut Transport) -> Box<dyn Future<Output = Result<()>>>>;
//type CB<'a> = dyn Fn(Event, &mut Transport) -> BoxFuture<'a, Result<()>>;
type BoxFuture<'a, T> = Box<dyn Future<Output = T> + 'a>;
type CallbackDyn = dyn for<'a> Fn(Event, &'a mut Transport) -> BoxFuture<'a, Result<()>>;
enum Handler {
/// Very simplified transport that receives from a channel and sends to another channel. In practice,
/// this would be replaced with a transport that reads/writes events from/to files or a connection.
struct Transport {
erx: Receiver<anyhow::Error>,
etx: Sender<anyhow::Error>,
msg_id: usize,
outbox: Sender<Event>,
inbox: Receiver<Option<Event>>,
callbacks: HashMap<usize, (Handler, Instant)>,
callbacks: HashMap<usize, (oneshot::Sender<Option<Event>>, Instant)>,
impl Transport {
fn new() -> (Self, Sender<Option<Event>>, Receiver<Event>) {
let (itx, irx) = channel::<Option<Event>>(1);
let (otx, orx) = channel::<Event>(1);
(Self {
msg_id: 0,
outbox: otx,
inbox: irx,
callbacks: Default::default(),
}, itx, orx)
let (etx, erx) = channel(1);
Self {
msg_id: 0,
outbox: otx,
inbox: irx,
callbacks: Default::default(),
fn close(&mut self) -> Result<()> {
#[instrument(level = "debug", skip(self))]
async fn close(mut self) -> Result<()> {
let i = Instant::now();
self.callbacks.retain(|_, (_, t)| *t >= i);
if self.callbacks.is_empty() {
} else {
bail!("some callbacks failed to launch.")
debug!("Waiting for errors to propagate");
for (c, _t) in self.callbacks.into_values() {
let _ = c.send(None);
if let Some(e) = self.erx.recv().await {
return Err(e);
async fn send(&self, e: Event) -> Result<()> {
#[instrument(level = "debug", skip(self))]
async fn send(&mut self, mut e: Event) -> Result<()> {
e.id = self.msg_id;
self.msg_id += 1;
self.outbox.send(e).await.context("unable to send message")
async fn rpc(&mut self, e: Event) -> Result<Event> {
let rx = self.rpc_chan(e).await?;
let resp = rx.await?;
async fn rpc_chan(&mut self, e: Event) -> Result<oneshot::Receiver<Event>> {
#[instrument(level = "debug", skip(self))]
async fn rpc_chan(
&mut self,
e: Event,
expiration: Instant,
) -> Result<oneshot::Receiver<Option<Event>>> {
let id = e.id;
let (tx, rx) = oneshot::channel();
self.callbacks.insert(id, (Handler::Channel(tx), Instant::now() + Duration::from_millis(1000)));
self.callbacks.insert(id, (tx, expiration));
async fn rpc_callback<C>(&mut self, e: Event, cb: C) -> Result<()>
C: for<'a> Fn(Event, &'a mut Transport) -> BoxFuture<'a, Result<()>> + 'static
async fn rpc_callback<C, F>(&mut self, e: Event, cb: C, expiration: Instant) -> Result<()>
C: 'static + Send + FnOnce(oneshot::Receiver<Option<Event>>, Sender<Event>) -> F,
F: Future<Output = Result<()>> + Send + 'static,
self.callbacks.insert(e.id, (Handler::Callback(Box::new(cb)), (Instant::now() + Duration::from_millis(1000))));
let rec = self.rpc_chan(e, expiration).await?;
let sender = self.outbox.clone();
let tx = self.etx.clone();
task::spawn(async move {
if let Ok(Err(e)) = timeout_at(expiration, cb(rec, sender)).await {
error!("propagating error '{e:?}'");
.expect("could not propagate error to transport.");
/// #Cancellation safety: This method is **not** cancellation safe, because it deals with callbacks
/// #Cancellation safety: This method is cancellation safe
#[instrument(level = "debug", skip(self))]
async fn recv(&mut self) -> Result<Option<Event>> {
loop {
let Some(nxt) = self.inbox.recv().await else {
return Ok(None);
let Some(nxt) = nxt else {
return Ok(None);
let Some(original) = nxt.in_reply_to else {
return Ok(Some(nxt));
let sp = match self.callbacks.remove(&original) {
None => {
return Ok(Some(nxt));
Some((h, t)) => {
if t < Instant::now() {
tokio::select! {
rec = self.inbox.recv() => {
let Some(Some(nxt)) = rec else {
if let Some(e) = self.process_event(nxt).await? {
return Ok(Some(e));
match sp {
Handler::Callback(cb) => {
let f = Box::into_pin(cb(nxt, self));
Handler::Channel(c) => {
if let Err(e) = c.send(nxt) {
bail!("could not send event to callback {e:?}")
e = self.erx.recv() => {
error!("OHNOES, we have received an error");
if let Some(e) = e {
return Err(e);
info!("no more errors");
macro_rules! callback {
(|$e:ident, $svc:ident| $blk:block) => {
|$e, $svc| {
Box::new(async move {
async fn process_event(&mut self, nxt: Event) -> Result<Option<Event>> {
let Some(original) = nxt.in_reply_to else {
return Ok(Some(nxt));
if let Some((c, t)) = self.callbacks.remove(&original) {
if t >= Instant::now() {
if let Err(e) = c.send(Some(nxt)) {
bail!("could not send event to callback {e:?}")
///// Simple macro to generate a dyn-safe closure from an async block.
//macro_rules! callback {
// (|$e:ident, $svc:ident| $blk:block) => {
// move |$e, $svc| {
// async move {
// $blk
// }
// }
// }
struct Service {
t: Transport,
/// All-in-one implementation of a service. In practice, this would be a trait where the `serve`
/// part is kind of generic, and the process varies between implementers.
impl Service {
#[instrument(level="debug", skip(self),ret)]
async fn process(&mut self, event: Event) -> Result<()>{
let resp = self.t.rpc(Event{id: event.id, in_reply_to: None}).await?;
fn new(t: Transport) -> Self {
Self { t }
#[instrument(level = "debug", skip(self))]
async fn process(&mut self, event: Event) -> Result<()> {
let eid = event.id;
info!("Service received event");
let resp = self
Event {
id: event.id,
in_reply_to: None,
move |chan: oneshot::Receiver<Option<Event>>, out: Sender<Event>| {
async move {
let Some(e) = chan.await? else {
warn!("Reply not received for event {eid}");
return Ok(());
info!("Received reply {e:?}");
out.send(Event {
id: 199,
in_reply_to: Some(e.id),
.context("could not send reply.")?;
//bail!("forcing an error here!");
Instant::now() + Duration::from_millis(100000),
debug!("{:?}", &resp);
#[instrument(level="debug", skip(self),ret)]
async fn process_nonblocking(&mut self, event: Event) -> Result<()>{
let resp = self.t.rpc_callback(Event{id: event.id, in_reply_to: None},
callback!(|e, svc| {
svc.send(Event{id: 199, in_reply_to: Some(e.id)}).await
debug!("{:?}", &resp);
#[instrument(level="debug", skip(self),ret)]
async fn serve(mut self) -> Result<()>{
#[instrument(level = "debug", skip(self))]
async fn serve(mut self) -> Result<()> {
while let Some(e) = self.t.recv().await? {
async fn main() -> Result<()> {
let filter = EnvFilter::builder()
info!("Starting example server");
let (t, tx, mut rx) = Transport::new();
let serv = Service{t};
let serv = Service::new(t);
let tx1 = tx.clone();
let s = tokio::spawn(async move {
for i in 0..2 {
debug!("Sending event {i}");
tx1.send(Some(Event{id: i, in_reply_to: None})).await.context("could not send original message")?;
for i in 0..5 {
debug!("Main sending event {i}");
tx1.send(Some(Event {
id: i,
in_reply_to: None,
.context("could not send original message")?;
debug!("Main sent event {i}");
debug!("Main done sending events");
Ok::<(), anyhow::Error>(())
let r = tokio::spawn(async move {
for i in 0..2 {
info!("Spawning receptor");
for _i in 0..3 {
let Some(msg) = rx.recv().await else {
if msg.in_reply_to.is_some() {
info!("Main received response: {msg:?}");
if i < 0 {
let reply = Event{id: 99, in_reply_to: Some(msg.id)};
debug!("Sending reply: {:?}", &reply);
tx.send(Some(reply)).await.context("could not send reply")?;
} else {
println!("DONE WITH REPLIES");
info!("Main received request: {msg:?}");
let reply = Event {
id: 99,
in_reply_to: Some(msg.id),
debug!("Sending reply: {:?}", &reply);
let tx = tx.clone();
tokio::spawn(async move {
tx.send(Some(reply)).await.expect("could not send reply");
tx.send(None).await.context("could not send closing message")?;
info!("Not sending more replies. Logging outgoing messages.");
while let Some(msg) = rx.recv().await {
info!("Main received: {msg:?}");
if let Err(e) = serv.serve().await {
error!("Have received the error: {e:?}");
return Ok(());
s.await.context("error joining")??;
@ -0,0 +1,598 @@
@ -0,0 +1,12 @@
name = "tokioserde"
version = "0.1.0"
edition = "2021"
anyhow = "1.0.95"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.137"
tokio = { version = "1.43.0", features = ["full"] }
tracing = { version = "0.1.41", features = ["attributes"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
@ -0,0 +1,204 @@
/// A prototype design for the implementation of Maelstrom services.
/// It consists of the following main elements:
/// * Msg<P> - For messages with generic payload P
/// * Transport Trait - A transport that can send and receive Msg<P>. The basic implementation is
/// NewlineTransport.
/// * MsgSender Trait - capable of sending Msg<P>. Transport should generate new senders through
/// `clone_sender`.
use tokio::{
time::{sleep, Duration},
sync::{oneshot, mpsc::{channel, Sender}}
use serde::{Serialize,Deserialize, de::DeserializeOwned};
use serde_json::Value;
use std::fmt::Debug;
use tokio::io::{self, AsyncRead, BufReader, AsyncWrite, AsyncWriteExt, AsyncBufReadExt, Lines, BufWriter};
use anyhow::{bail, Context, Result};
use std::future::Future;
use std::pin::{pin};
use tracing::{instrument, info, debug, warn};
use tracing_subscriber::{filter::EnvFilter};
struct Message {
body: String,
#[derive(Debug, Deserialize, Serialize)]
struct Msg<P> {
src: u64,
dest: u64,
body: Body<P>,
#[derive(Debug, Deserialize, Serialize)]
struct Body<P>{
msg_id: Option<u64>,
in_reply_to: Option<u64>,
payload: P
impl<P> Msg<P>
where P: Serialize
fn to_generic(&self) -> Result<Msg<Value>> {
Ok(Msg {
src: self.src.clone(),
dest: self.dest.clone(),
body: Body {
msg_id: self.body.msg_id.clone(),
in_reply_to: self.body.in_reply_to.clone(),
payload: serde_json::to_value(&self.body.payload)?
struct NewlineTransport<R> {
input: Lines<BufReader<R>>,
jh: JoinHandle<Result<()>>,
output: AckSender,
impl NewlineTransport<io::Stdin>
fn new() -> Self {
Self::from_rw(io::stdin(), io::stdout())
type AckSender = Sender<(Msg<Value>, oneshot::Sender<Result<usize>>)>;
impl<R> NewlineTransport<R>
R: AsyncRead + Unpin,
fn from_rw<W: AsyncWrite + Send + 'static>(r: R, w: W) -> Self {
let buf = io::BufReader::new(r);
let (output, mut rx) = channel::<(Msg<Value>, oneshot::Sender<Result<usize>>)>(1);
let jh = tokio::spawn(async move {
let mut out = pin!(BufWriter::new(w));
let mut msg_id = 0; // This msg_id could be added to the Transport with an Arc or
// similar.
while let Some((mut gen, back)) = rx.recv().await {
gen.body.msg_id = Some(msg_id);
msg_id += 1;
let s = serde_json::to_string(&gen).context("failed to serialize message")?;
let res = out.write(s.as_bytes()).await.context("failed to write");
if let Err(res) = back.send(res) {
bail!("failed to send result of sending: {res:?}");
Self {
input: buf.lines(),
trait Transport {
fn clone_sender(&self) -> impl MsgSender + 'static;
fn close(self) -> impl Future<Output=Result<()>>;
fn recv<P: Debug + DeserializeOwned>(&mut self) -> impl Future<Output=Result<Option<Msg<P>>>>;
impl<R> Transport for NewlineTransport<R>
R: AsyncRead + Unpin,
fn clone_sender(&self) -> impl MsgSender + 'static {
async fn close(self) -> Result<()>{
async fn recv<P: Debug + DeserializeOwned>(&mut self) -> Result<Option<Msg<P>>> {
let Some(line) = self.input.next_line().await? else {
return Ok(None);
trait MsgSender: Send {
fn send_msg<P: Serialize + Debug + Sync>(&mut self, m: &Msg<P>) -> impl Future<Output=Result<usize>> + Send;
impl MsgSender for AckSender {
async fn send_msg<P: Serialize + Debug + Sync>(&mut self, m: &Msg<P>) -> Result<usize> {
let (tx, rx) = oneshot::channel();
let generic = m.to_generic()?;
Sender::send(self, (generic, tx)).await.context("could not send message to sender")?;
rx.await.context("could not send ack")?
trait Service {
type P: Debug + DeserializeOwned + Serialize;
fn process(&mut self, m: Msg<Self::P>, s: impl MsgSender + 'static) -> impl Future<Output=Result<()>>;
fn serve<T: Transport>(mut self, mut t: T) -> impl Future<Output=Result<()>>
where Self: Sized{
async move {
while let Some(m) = t.recv::<Self::P>().await? {
let mut s = t.clone_sender();
self.process(m, s).await?;
info!("Closing transport");
t.close().await.expect("should not panic");
struct EchoService {}
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
enum EchoPayload {
Echo {
echo: String
impl Service for EchoService {
type P = EchoPayload;
async fn process(&mut self, m: Msg<Self::P>, mut s: impl MsgSender + 'static) -> Result<()> {
tracing::info!("Received: {m:?}");
tokio::spawn(async move {
s.send_msg(&Msg{src: 1, dest: 1, body: { Body { msg_id: Some(1), in_reply_to: None, payload: EchoPayload::Echo{ echo: String::from("Hello") }}}}).await.expect("sending failed");
async fn main() -> Result<()>{
let mut t = NewlineTransport::new();
let s = EchoService{};
