Skip to main content

hydro_lang/deploy/maelstrom/
mod.rs

1//! Deployment backend for running correctness tests against Jepsen Maelstrom (<https://github.com/jepsen-io/maelstrom>)
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5
6use crate::forward_handle::ForwardHandle;
7use crate::live_collections::KeyedStream;
8use crate::live_collections::stream::TotalOrder;
9use crate::location::Cluster;
10use crate::nondet::nondet;
11
12#[cfg(stageleft_runtime)]
13#[cfg(feature = "maelstrom")]
14#[cfg_attr(docsrs, doc(cfg(feature = "maelstrom")))]
15pub mod deploy_maelstrom;
16
17pub mod deploy_runtime_maelstrom;
18
19/// Sets up bidirectional communication with Maelstrom clients on a cluster.
20///
21/// This function provides a similar API to `bidi_external_many_bytes` but for Maelstrom
22/// client communication. It returns a keyed input stream of client messages and accepts
23/// a keyed output stream of responses.
24///
25/// The key type is `String` (the client ID like "c1", "c2").
26/// The value type is `serde_json::Value` (the message body).
27///
28/// # Example
29/// ```ignore
30/// let (input, output_handle) = maelstrom_bidi_clients(&cluster);
31/// output_handle.complete(input.map(q!(|(client_id, body)| {
32///     // Process and return response
33///     (client_id, response_body)
34/// })));
35/// ```
36pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
37    cluster: &Cluster<'a, C>,
38) -> (
39    KeyedStream<String, In, Cluster<'a, C>>,
40    ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
41) {
42    use stageleft::q;
43
44    use crate::location::Location;
45
46    let meta: stageleft::RuntimeData<&deploy_runtime_maelstrom::MaelstromMeta> =
47        stageleft::RuntimeData::new("__hydro_lang_maelstrom_meta");
48
49    // Create the input stream from Maelstrom clients
50    let input: KeyedStream<String, In, Cluster<'a, C>> = cluster
51        .source_stream(q!(deploy_runtime_maelstrom::maelstrom_client_source(meta)))
52        .into_keyed()
53        .map(q!(|b| serde_json::from_value(b).unwrap()));
54
55    // Create a forward reference for the output stream
56    let (fwd_handle, output_stream) =
57        cluster.forward_ref::<KeyedStream<String, Out, Cluster<'a, C>>>();
58
59    // Set up the output sink to send responses back to clients
60    output_stream
61        .entries()
62        .assume_ordering::<TotalOrder>(nondet!(/** maelstrom responses can be sent in any order */))
63        .for_each(q!(|(client_id, body)| {
64            deploy_runtime_maelstrom::maelstrom_send_response(
65                &meta.node_id,
66                &client_id,
67                serde_json::to_value(body).unwrap(),
68            );
69        }));
70
71    (input, fwd_handle)
72}