hydro_lang/deploy/maelstrom/
mod.rs1use serde::Serialize;
4use serde::de::DeserializeOwned;
5
6use crate::forward_handle::ForwardHandle;
7use crate::live_collections::KeyedStream;
8use crate::live_collections::stream::TotalOrder;
9use crate::location::Cluster;
10use crate::nondet::nondet;
11
12#[cfg(stageleft_runtime)]
13#[cfg(feature = "maelstrom")]
14#[cfg_attr(docsrs, doc(cfg(feature = "maelstrom")))]
15pub mod deploy_maelstrom;
16
17pub mod deploy_runtime_maelstrom;
18
19pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
37 cluster: &Cluster<'a, C>,
38) -> (
39 KeyedStream<String, In, Cluster<'a, C>>,
40 ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
41) {
42 use stageleft::q;
43
44 use crate::location::Location;
45
46 let meta: stageleft::RuntimeData<&deploy_runtime_maelstrom::MaelstromMeta> =
47 stageleft::RuntimeData::new("__hydro_lang_maelstrom_meta");
48
49 let input: KeyedStream<String, In, Cluster<'a, C>> = cluster
51 .source_stream(q!(deploy_runtime_maelstrom::maelstrom_client_source(meta)))
52 .into_keyed()
53 .map(q!(|b| serde_json::from_value(b).unwrap()));
54
55 let (fwd_handle, output_stream) =
57 cluster.forward_ref::<KeyedStream<String, Out, Cluster<'a, C>>>();
58
59 output_stream
61 .entries()
62 .assume_ordering::<TotalOrder>(nondet!())
63 .for_each(q!(|(client_id, body)| {
64 deploy_runtime_maelstrom::maelstrom_send_response(
65 &meta.node_id,
66 &client_id,
67 serde_json::to_value(body).unwrap(),
68 );
69 }));
70
71 (input, fwd_handle)
72}