Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35    ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46#[cfg(stageleft_runtime)]
47use crate::location::dynamic::DynLocation;
48use crate::location::dynamic::{ClusterConsistency, LocationId};
49use crate::location::external_process::{
50    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
51};
52use crate::nondet::NonDet;
53use crate::properties::manual_proof;
54#[cfg(feature = "sim")]
55use crate::sim::SimSender;
56use crate::staging_util::get_this_crate;
57
58pub mod dynamic;
59
60pub mod external_process;
61pub use external_process::External;
62
63pub mod process;
64pub use process::Process;
65
66pub mod cluster;
67pub use cluster::Cluster;
68
69pub mod member_id;
70pub use member_id::{MemberId, TaglessMemberId};
71
72pub mod tick;
73pub use tick::{Atomic, Tick};
74
75/// An event indicating a change in membership status of a location in a group
76/// (e.g. a node in a [`Cluster`] or an external client connection).
77#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
78pub enum MembershipEvent {
79    /// The member has joined the group and is now active.
80    Joined,
81    /// The member has left the group and is no longer active.
82    Left,
83}
84
85/// A hint for configuring the network transport used by an external connection.
86///
87/// This controls how the underlying TCP listener is set up when binding
88/// external client connections via methods like [`Location::bind_single_client`]
89/// or [`Location::bidi_external_many_bytes`].
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
91pub enum NetworkHint {
92    /// Automatically select the network configuration (e.g. an ephemeral port).
93    Auto,
94    /// Use a TCP port, optionally specifying a fixed port number.
95    ///
96    /// If `None`, an available port will be chosen automatically.
97    /// If `Some(port)`, the given port number will be used.
98    TcpPort(Option<u16>),
99}
100
101pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
102    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
103}
104
105#[stageleft::export(LocationKey)]
106new_key_type! {
107    /// A unique identifier for a clock tick.
108    pub struct LocationKey;
109}
110
111impl std::fmt::Display for LocationKey {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
114    }
115}
116
117/// This is used for the ECS membership stream.
118/// TODO(mingwei): Make this more robust?
119impl std::str::FromStr for LocationKey {
120    type Err = Option<ParseIntError>;
121
122    fn from_str(s: &str) -> Result<Self, Self::Err> {
123        let nvn = s.strip_prefix("loc").ok_or(None)?;
124        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
125        let idx: u64 = idx.parse()?;
126        let ver: u64 = ver.parse()?;
127        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
128    }
129}
130
131impl LocationKey {
132    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
133    /// The first location key, used by the simulator as the default external location.
134    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
135
136    /// A key for testing with index 1.
137    #[cfg(test)]
138    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); // `1v255`
139
140    /// A key for testing with index 2.
141    #[cfg(test)]
142    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); // `2v255`
143}
144
145/// This is used within `q!` code in docker and ECS.
146impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
147    type O = LocationKey;
148
149    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
150    where
151        Self: Sized,
152    {
153        let root = get_this_crate();
154        let n = Key::data(&self).as_ffi();
155        (
156            QuoteTokens {
157                prelude: None,
158                expr: Some(quote! {
159                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
160                }),
161            },
162            (),
163        )
164    }
165}
166
167/// A simple enum for the type of a root location.
168#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
169pub enum LocationType {
170    /// A process (single node).
171    Process,
172    /// A cluster (multiple nodes).
173    Cluster,
174    /// An external client.
175    External,
176}
177
178/// A top-level location (i.e. a [`Process`] or [`Cluster`]) that is outside a tick / atomic region.
179pub trait TopLevel<'a>: Location<'a> {}
180
181/// A location where data can be materialized and computation can be executed.
182///
183/// Hydro is a **global**, **distributed** programming model. This means that the data
184/// and computation in a Hydro program can be spread across multiple machines, data
185/// centers, and even continents. To achieve this, Hydro uses the concept of
186/// **locations** to keep track of _where_ data is located and computation is executed.
187///
188/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
189/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
190/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
191/// to allow live collections to be _moved_ between locations via network send/receive.
192///
193/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
194#[expect(
195    private_bounds,
196    reason = "only internal Hydro code can define location types"
197)]
198pub trait Location<'a>: DynLocation {
199    /// The root location type for this location.
200    ///
201    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
202    /// For nested locations like [`Tick`], this is the root location that contains it.
203    type Root: Location<'a>;
204
205    /// Location type with consistency guarantees dropped for the live collection on it.
206    type DropConsistency: Location<'a, DropConsistency = Self::DropConsistency>;
207
208    /// Returns the root location for this location.
209    ///
210    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
211    /// For nested locations like [`Tick`], this returns the root location that contains it.
212    fn root(&self) -> Self::Root;
213
214    /// This location but with consistency guarantees dropped for the live collection
215    fn drop_consistency(&self) -> Self::DropConsistency;
216    /// Gets the runtime enum variant for the current consistency level, if this is a cluster.
217    fn consistency() -> Option<ClusterConsistency>;
218
219    /// Updates the consistency guarantees to match that of the given location.
220    fn with_consistency_of<L2: Location<'a, DropConsistency = Self::DropConsistency>>(&self) -> L2 {
221        L2::from_drop_consistency(self.drop_consistency())
222    }
223
224    #[doc(hidden)]
225    fn from_drop_consistency(l2: Self::DropConsistency) -> Self;
226
227    /// Attempts to create a new [`Tick`] clock domain at this location.
228    ///
229    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
230    /// or `None` if this location is already inside a tick (nested ticks are not supported).
231    ///
232    /// Prefer using [`Location::tick`] when you know the location is top-level.
233    fn try_tick(&self) -> Option<Tick<Self>> {
234        if Self::is_top_level() {
235            let id = self.flow_state().borrow_mut().next_clock_id();
236            Some(Tick {
237                id,
238                l: self.clone(),
239            })
240        } else {
241            None
242        }
243    }
244
245    /// Returns the unique identifier for this location.
246    fn id(&self) -> LocationId {
247        DynLocation::dyn_id(self)
248    }
249
250    /// Creates a new [`Tick`] clock domain at this location.
251    ///
252    /// A tick represents a logical clock that can be used to batch streaming data
253    /// into discrete time steps. This is useful for implementing iterative algorithms
254    /// or for synchronizing data across multiple streams.
255    ///
256    /// # Example
257    /// ```rust
258    /// # #[cfg(feature = "deploy")] {
259    /// # use hydro_lang::prelude::*;
260    /// # use futures::StreamExt;
261    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
262    /// let tick = process.tick();
263    /// let inside_tick = process
264    ///     .source_iter(q!(vec![1, 2, 3, 4]))
265    ///     .batch(&tick, nondet!(/** test */));
266    /// inside_tick.all_ticks()
267    /// # }, |mut stream| async move {
268    /// // 1, 2, 3, 4
269    /// # for w in vec![1, 2, 3, 4] {
270    /// #     assert_eq!(stream.next().await.unwrap(), w);
271    /// # }
272    /// # }));
273    /// # }
274    /// ```
275    fn tick(&self) -> Tick<Self> {
276        if let LocationId::Tick(_, _) = self.id() {
277            panic!("cannot create nested ticks");
278        }
279
280        let id = self.flow_state().borrow_mut().next_clock_id();
281        Tick {
282            id,
283            l: self.clone(),
284        }
285    }
286
287    /// Creates an unbounded stream that continuously emits unit values `()`.
288    ///
289    /// This is useful for driving computations that need to run continuously,
290    /// such as polling or heartbeat mechanisms.
291    ///
292    /// # Example
293    /// ```rust
294    /// # #[cfg(feature = "deploy")] {
295    /// # use hydro_lang::prelude::*;
296    /// # use futures::StreamExt;
297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
298    /// let tick = process.tick();
299    /// process.spin()
300    ///     .batch(&tick, nondet!(/** test */))
301    ///     .map(q!(|_| 42))
302    ///     .all_ticks()
303    /// # }, |mut stream| async move {
304    /// // 42, 42, 42, ...
305    /// # assert_eq!(stream.next().await.unwrap(), 42);
306    /// # assert_eq!(stream.next().await.unwrap(), 42);
307    /// # assert_eq!(stream.next().await.unwrap(), 42);
308    /// # }));
309    /// # }
310    /// ```
311    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
312    where
313        Self: TopLevel<'a> + Sized,
314    {
315        Stream::new(
316            self.clone(),
317            HydroNode::Source {
318                source: HydroSource::Spin(),
319                metadata: self.new_node_metadata(Stream::<
320                    (),
321                    Self,
322                    Unbounded,
323                    TotalOrder,
324                    ExactlyOnce,
325                >::collection_kind()),
326            },
327        )
328    }
329
330    /// Creates a stream from an async [`FuturesStream`].
331    ///
332    /// This is useful for integrating with external async data sources,
333    /// such as network connections or file readers.
334    ///
335    /// # Example
336    /// ```rust
337    /// # #[cfg(feature = "deploy")] {
338    /// # use hydro_lang::prelude::*;
339    /// # use futures::StreamExt;
340    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
341    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
342    /// # }, |mut stream| async move {
343    /// // 1, 2, 3
344    /// # for w in vec![1, 2, 3] {
345    /// #     assert_eq!(stream.next().await.unwrap(), w);
346    /// # }
347    /// # }));
348    /// # }
349    /// ```
350    fn source_stream<T, E>(
351        &self,
352        e: impl QuotedWithContext<'a, E, Self>,
353    ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
354    where
355        E: FuturesStream<Item = T> + Unpin,
356        Self: TopLevel<'a> + Sized,
357    {
358        let e = e.splice_untyped_ctx(self);
359
360        let target_location = self.drop_consistency();
361        Stream::new(
362            target_location.clone(),
363            HydroNode::Source {
364                source: HydroSource::Stream(e.into()),
365                metadata: target_location.new_node_metadata(Stream::<
366                    T,
367                    Self::DropConsistency,
368                    Unbounded,
369                    TotalOrder,
370                    ExactlyOnce,
371                >::collection_kind()),
372            },
373        )
374    }
375
376    /// Creates a bounded stream from an iterator.
377    ///
378    /// The iterator is evaluated once at runtime, and all elements are emitted
379    /// in order. This is useful for creating streams from static data or
380    /// for testing.
381    ///
382    /// # Example
383    /// ```rust
384    /// # #[cfg(feature = "deploy")] {
385    /// # use hydro_lang::prelude::*;
386    /// # use futures::StreamExt;
387    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
388    /// process.source_iter(q!(vec![1, 2, 3, 4]))
389    /// # }, |mut stream| async move {
390    /// // 1, 2, 3, 4
391    /// # for w in vec![1, 2, 3, 4] {
392    /// #     assert_eq!(stream.next().await.unwrap(), w);
393    /// # }
394    /// # }));
395    /// # }
396    /// ```
397    fn source_iter<T, E>(
398        &self,
399        e: impl QuotedWithContext<'a, E, Self>,
400    ) -> Stream<T, Self::DropConsistency, Bounded, TotalOrder, ExactlyOnce>
401    where
402        E: IntoIterator<Item = T>,
403        Self: Sized,
404    {
405        let e = e.splice_typed_ctx(self);
406
407        let target_location = self.drop_consistency();
408        Stream::new(
409            target_location.clone(),
410            HydroNode::Source {
411                source: HydroSource::Iter(e.into()),
412                metadata: target_location.new_node_metadata(Stream::<
413                    T,
414                    Self::DropConsistency,
415                    Bounded,
416                    TotalOrder,
417                    ExactlyOnce,
418                >::collection_kind()),
419            },
420        )
421    }
422
423    #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
424    /// Creates a stream of membership events for a cluster.
425    ///
426    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
427    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
428    /// keyed by the [`MemberId`] of the cluster member.
429    ///
430    /// This is useful for implementing protocols that need to track cluster membership,
431    /// such as broadcasting to all members or detecting failures.
432    ///
433    /// # Non-Determinism
434    /// This stream is non-deterministic because the timing of membership events, for example
435    /// if a node leaves, the membership event may not be received if the node left before the
436    /// stream was created.
437    ///
438    /// # Example
439    /// ```rust
440    /// # #[cfg(feature = "deploy")] {
441    /// # use hydro_lang::prelude::*;
442    /// # use futures::StreamExt;
443    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
444    /// let p1 = flow.process::<()>();
445    /// let workers: Cluster<()> = flow.cluster::<()>();
446    /// # // do nothing on each worker
447    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
448    /// let cluster_members = p1.source_cluster_members(&workers, nondet!(/** late joiners may miss events */));
449    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
450    /// // if there are 4 members in the cluster, we would see a join event for each
451    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
452    /// # }, |mut stream| async move {
453    /// # let mut results = Vec::new();
454    /// # for w in 0..4 {
455    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
456    /// # }
457    /// # results.sort();
458    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
459    /// # }));
460    /// # }
461    /// ```
462    fn source_cluster_members<C: 'a>(
463        &self,
464        cluster: &Cluster<'a, C>,
465        nondet_start: NonDet,
466    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
467    where
468        Self: TopLevel<'a> + Sized,
469    {
470        self.source_cluster_membership_stream(cluster, nondet_start)
471    }
472
473    /// Creates a stream of membership events for a cluster.
474    ///
475    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
476    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
477    /// keyed by the [`MemberId`] of the cluster member.
478    ///
479    /// This is useful for implementing protocols that need to track cluster membership,
480    /// such as broadcasting to all members or detecting failures.
481    ///
482    /// # Non-Determinism
483    /// This stream is non-deterministic because the timing of membership events, for example
484    /// if a node leaves, the membership event may not be received if the node left before the
485    /// stream was created.
486    ///
487    /// # Example
488    /// ```rust
489    /// # #[cfg(feature = "deploy")] {
490    /// # use hydro_lang::prelude::*;
491    /// # use futures::StreamExt;
492    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
493    /// let p1 = flow.process::<()>();
494    /// let workers: Cluster<()> = flow.cluster::<()>();
495    /// # // do nothing on each worker
496    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
497    /// let cluster_members = p1.source_cluster_membership_stream(&workers, nondet!(/** late joiners may miss events */));
498    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
499    /// // if there are 4 members in the cluster, we would see a join event for each
500    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
501    /// # }, |mut stream| async move {
502    /// # let mut results = Vec::new();
503    /// # for w in 0..4 {
504    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
505    /// # }
506    /// # results.sort();
507    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
508    /// # }));
509    /// # }
510    /// ```
511    fn source_cluster_membership_stream<C: 'a>(
512        &self,
513        cluster: &Cluster<'a, C>,
514        _nondet_start: NonDet,
515    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
516    where
517        Self: TopLevel<'a> + Sized,
518    {
519        let target_consistency = self.drop_consistency();
520        Stream::new(
521            target_consistency.clone(),
522            HydroNode::Source {
523                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
524                metadata: target_consistency.new_node_metadata(Stream::<
525                    (TaglessMemberId, MembershipEvent),
526                    Self,
527                    Unbounded,
528                    TotalOrder,
529                    ExactlyOnce,
530                >::collection_kind(
531                )),
532            },
533        )
534        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
535        .into_keyed()
536    }
537
538    /// Creates a one-way connection from an external process to receive raw bytes.
539    ///
540    /// Returns a port handle for the external process to connect to, and a stream
541    /// of received byte buffers.
542    ///
543    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
544    /// or [`Location::source_external_bincode`].
545    fn source_external_bytes<L>(
546        &self,
547        from: &External<L>,
548    ) -> (
549        ExternalBytesPort,
550        Stream<BytesMut, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
551    )
552    where
553        Self: TopLevel<'a> + Sized,
554    {
555        let (port, stream, sink) =
556            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
557
558        sink.complete(stream.location().source_iter(q!([])));
559
560        (port, stream)
561    }
562
563    /// Creates a one-way connection from an external process to receive bincode-serialized data.
564    ///
565    /// Returns a sink handle for the external process to send data to, and a stream
566    /// of received values.
567    ///
568    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
569    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
570        &self,
571        from: &External<L>,
572    ) -> (
573        ExternalBincodeSink<T, NotMany, O, R>,
574        Stream<T, Self::DropConsistency, Unbounded, O, R>,
575    )
576    where
577        Self: TopLevel<'a> + Sized,
578        T: Serialize + DeserializeOwned,
579    {
580        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
581        sink.complete(stream.location().source_iter(q!([])));
582
583        (
584            ExternalBincodeSink {
585                process_key: from.key,
586                port_id: port.port_id,
587                _phantom: PhantomData,
588            },
589            stream.weaken_ordering().weaken_retries(),
590        )
591    }
592
593    /// Sets up a simulated input port on this location for testing.
594    ///
595    /// Returns a handle to send messages to the location as well as a stream
596    /// of received messages. This is only available when the `sim` feature is enabled.
597    #[cfg(feature = "sim")]
598    fn sim_input<T, O: Ordering, R: Retries>(
599        &self,
600    ) -> (
601        SimSender<T, O, R>,
602        Stream<T, Self::DropConsistency, Unbounded, O, R>,
603    )
604    where
605        Self: TopLevel<'a> + Sized,
606        T: Serialize + DeserializeOwned,
607    {
608        let external_location: External<'a, ()> = External {
609            key: LocationKey::FIRST,
610            flow_state: self.flow_state().clone(),
611            _phantom: PhantomData,
612        };
613
614        let (external, stream) = self.source_external_bincode(&external_location);
615
616        (SimSender(external.port_id, PhantomData), stream)
617    }
618
619    /// Creates an external input stream for embedded deployment mode.
620    ///
621    /// The `name` parameter specifies the name of the generated function parameter
622    /// that will supply data to this stream at runtime. The generated function will
623    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
624    fn embedded_input<T>(
625        &self,
626        name: impl Into<String>,
627    ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
628    where
629        Self: TopLevel<'a> + Sized,
630    {
631        let ident = syn::Ident::new(&name.into(), Span::call_site());
632
633        let target_location = self.drop_consistency();
634        Stream::new(
635            target_location.clone(),
636            HydroNode::Source {
637                source: HydroSource::Embedded(ident),
638                metadata: target_location.new_node_metadata(Stream::<
639                    T,
640                    Self,
641                    Unbounded,
642                    TotalOrder,
643                    ExactlyOnce,
644                >::collection_kind()),
645            },
646        )
647    }
648
649    /// Creates an embedded singleton input for embedded deployment mode.
650    ///
651    /// The `name` parameter specifies the name of the generated function parameter
652    /// that will supply data to this singleton at runtime. The generated function will
653    /// accept a plain `T` parameter with this name.
654    fn embedded_singleton_input<T>(
655        &self,
656        name: impl Into<String>,
657    ) -> Singleton<T, Self::DropConsistency, Bounded>
658    where
659        Self: TopLevel<'a> + Sized,
660    {
661        let ident = syn::Ident::new(&name.into(), Span::call_site());
662
663        let target_location = self.drop_consistency();
664        Singleton::new(
665            target_location.clone(),
666            HydroNode::Source {
667                source: HydroSource::EmbeddedSingleton(ident),
668                metadata: target_location
669                    .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
670            },
671        )
672    }
673
674    /// Establishes a server on this location to receive a bidirectional connection from a single
675    /// client, identified by the given `External` handle. Returns a port handle for the external
676    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
677    /// messages.
678    ///
679    /// # Example
680    /// ```rust
681    /// # #[cfg(feature = "deploy")] {
682    /// # use hydro_lang::prelude::*;
683    /// # use hydro_deploy::Deployment;
684    /// # use futures::{SinkExt, StreamExt};
685    /// # tokio_test::block_on(async {
686    /// # use bytes::Bytes;
687    /// # use hydro_lang::location::NetworkHint;
688    /// # use tokio_util::codec::LengthDelimitedCodec;
689    /// # let mut flow = FlowBuilder::new();
690    /// let node = flow.process::<()>();
691    /// let external = flow.external::<()>();
692    /// let (port, incoming, outgoing) =
693    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
694    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
695    ///     let mut resp: Vec<u8> = data.into();
696    ///     resp.push(42);
697    ///     resp.into() // : Bytes
698    /// })));
699    ///
700    /// # let mut deployment = Deployment::new();
701    /// let nodes = flow // ... with_process and with_external
702    /// #     .with_process(&node, deployment.Localhost())
703    /// #     .with_external(&external, deployment.Localhost())
704    /// #     .deploy(&mut deployment);
705    ///
706    /// deployment.deploy().await.unwrap();
707    /// deployment.start().await.unwrap();
708    ///
709    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
710    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
711    /// assert_eq!(
712    ///     external_out.next().await.unwrap().unwrap(),
713    ///     vec![1, 2, 3, 42]
714    /// );
715    /// # });
716    /// # }
717    /// ```
718    #[expect(clippy::type_complexity, reason = "stream markers")]
719    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
720        &self,
721        from: &External<L>,
722        port_hint: NetworkHint,
723    ) -> (
724        ExternalBytesPort<NotMany>,
725        Stream<<Codec as Decoder>::Item, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
726        ForwardHandle<'a, Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
727    )
728    where
729        Self: TopLevel<'a> + Sized,
730    {
731        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
732        let target_consistency = self.drop_consistency();
733
734        let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
735            T,
736            Self::DropConsistency,
737            Unbounded,
738            TotalOrder,
739            ExactlyOnce,
740        >>();
741        let mut flow_state_borrow = self.flow_state().borrow_mut();
742
743        flow_state_borrow.push_root(HydroRoot::SendExternal {
744            to_external_key: from.key,
745            to_port_id: next_external_port_id,
746            to_many: false,
747            unpaired: false,
748            serialize_fn: None,
749            instantiate_fn: DebugInstantiate::Building,
750            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
751            op_metadata: HydroIrOpMetadata::new(),
752        });
753
754        let raw_stream: Stream<
755            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
756            Self::DropConsistency,
757            Unbounded,
758            TotalOrder,
759            ExactlyOnce,
760        > = Stream::new(
761            target_consistency.clone(),
762            HydroNode::ExternalInput {
763                from_external_key: from.key,
764                from_port_id: next_external_port_id,
765                from_many: false,
766                codec_type: quote_type::<Codec>().into(),
767                port_hint,
768                instantiate_fn: DebugInstantiate::Building,
769                deserialize_fn: None,
770                metadata: target_consistency.new_node_metadata(Stream::<
771                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
772                    Self::DropConsistency,
773                    Unbounded,
774                    TotalOrder,
775                    ExactlyOnce,
776                >::collection_kind(
777                )),
778            },
779        );
780
781        (
782            ExternalBytesPort {
783                process_key: from.key,
784                port_id: next_external_port_id,
785                _phantom: PhantomData,
786            },
787            raw_stream.flatten_ordered(),
788            fwd_ref,
789        )
790    }
791
792    /// Establishes a bidirectional connection from a single external client using bincode serialization.
793    ///
794    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
795    /// and a handle to send outgoing messages. This is a convenience wrapper around
796    /// [`Location::bind_single_client`] that uses bincode for serialization.
797    ///
798    /// # Type Parameters
799    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
800    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
801    #[expect(clippy::type_complexity, reason = "stream markers")]
802    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
803        &self,
804        from: &External<L>,
805    ) -> (
806        ExternalBincodeBidi<InT, OutT, NotMany>,
807        Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
808        ForwardHandle<'a, Stream<OutT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
809    )
810    where
811        Self: TopLevel<'a> + Sized,
812    {
813        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
814
815        let target_consistency = self.drop_consistency();
816        let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
817            OutT,
818            Self::DropConsistency,
819            Unbounded,
820            TotalOrder,
821            ExactlyOnce,
822        >>();
823        let mut flow_state_borrow = self.flow_state().borrow_mut();
824
825        let root = get_this_crate();
826
827        let out_t_type = quote_type::<OutT>();
828        let ser_fn: syn::Expr = syn::parse_quote! {
829            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
830                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
831            )
832        };
833
834        flow_state_borrow.push_root(HydroRoot::SendExternal {
835            to_external_key: from.key,
836            to_port_id: next_external_port_id,
837            to_many: false,
838            unpaired: false,
839            serialize_fn: Some(ser_fn.into()),
840            instantiate_fn: DebugInstantiate::Building,
841            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
842            op_metadata: HydroIrOpMetadata::new(),
843        });
844
845        let in_t_type = quote_type::<InT>();
846
847        let deser_fn: syn::Expr = syn::parse_quote! {
848            |res| {
849                let b = res.unwrap();
850                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
851            }
852        };
853
854        let raw_stream: Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce> =
855            Stream::new(
856                target_consistency.clone(),
857                HydroNode::ExternalInput {
858                    from_external_key: from.key,
859                    from_port_id: next_external_port_id,
860                    from_many: false,
861                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
862                    port_hint: NetworkHint::Auto,
863                    instantiate_fn: DebugInstantiate::Building,
864                    deserialize_fn: Some(deser_fn.into()),
865                    metadata: target_consistency.new_node_metadata(Stream::<
866                        InT,
867                        Self::DropConsistency,
868                        Unbounded,
869                        TotalOrder,
870                        ExactlyOnce,
871                    >::collection_kind(
872                    )),
873                },
874            );
875
876        (
877            ExternalBincodeBidi {
878                process_key: from.key,
879                port_id: next_external_port_id,
880                _phantom: PhantomData,
881            },
882            raw_stream,
883            fwd_ref,
884        )
885    }
886
887    /// Establishes a server on this location to receive bidirectional connections from multiple
888    /// external clients using raw bytes.
889    ///
890    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
891    /// connections. Each client is assigned a unique `u64` identifier.
892    ///
893    /// Returns:
894    /// - A port handle for external processes to connect to
895    /// - A keyed stream of incoming messages, keyed by client ID
896    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
897    /// - A handle to send outgoing messages, keyed by client ID
898    #[expect(clippy::type_complexity, reason = "stream markers")]
899    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
900        &self,
901        from: &External<L>,
902        port_hint: NetworkHint,
903    ) -> (
904        ExternalBytesPort<Many>,
905        KeyedStream<
906            u64,
907            <Codec as Decoder>::Item,
908            Self::DropConsistency,
909            Unbounded,
910            TotalOrder,
911            ExactlyOnce,
912        >,
913        KeyedStream<
914            u64,
915            MembershipEvent,
916            Self::DropConsistency,
917            Unbounded,
918            TotalOrder,
919            ExactlyOnce,
920        >,
921        ForwardHandle<
922            'a,
923            KeyedStream<u64, T, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
924        >,
925    )
926    where
927        Self: TopLevel<'a> + Sized,
928    {
929        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
930
931        let target_consistency = self.drop_consistency();
932        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
933            u64,
934            T,
935            Self::DropConsistency,
936            Unbounded,
937            NoOrder,
938            ExactlyOnce,
939        >>();
940        let mut flow_state_borrow = self.flow_state().borrow_mut();
941
942        flow_state_borrow.push_root(HydroRoot::SendExternal {
943            to_external_key: from.key,
944            to_port_id: next_external_port_id,
945            to_many: true,
946            unpaired: false,
947            serialize_fn: None,
948            instantiate_fn: DebugInstantiate::Building,
949            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
950            op_metadata: HydroIrOpMetadata::new(),
951        });
952
953        let raw_stream: Stream<
954            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
955            Self::DropConsistency,
956            Unbounded,
957            TotalOrder,
958            ExactlyOnce,
959        > = Stream::new(
960            target_consistency.clone(),
961            HydroNode::ExternalInput {
962                from_external_key: from.key,
963                from_port_id: next_external_port_id,
964                from_many: true,
965                codec_type: quote_type::<Codec>().into(),
966                port_hint,
967                instantiate_fn: DebugInstantiate::Building,
968                deserialize_fn: None,
969                metadata: target_consistency.new_node_metadata(Stream::<
970                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
971                    Self::DropConsistency,
972                    Unbounded,
973                    TotalOrder,
974                    ExactlyOnce,
975                >::collection_kind(
976                )),
977            },
978        );
979
980        let membership_stream_ident = syn::Ident::new(
981            &format!(
982                "__hydro_deploy_many_{}_{}_membership",
983                from.key, next_external_port_id
984            ),
985            Span::call_site(),
986        );
987        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
988        let raw_membership_stream: KeyedStream<
989            u64,
990            bool,
991            Self::DropConsistency,
992            Unbounded,
993            TotalOrder,
994            ExactlyOnce,
995        > = KeyedStream::new(
996            target_consistency.clone(),
997            HydroNode::Source {
998                source: HydroSource::Stream(membership_stream_expr.into()),
999                metadata: target_consistency.new_node_metadata(KeyedStream::<
1000                    u64,
1001                    bool,
1002                    Self::DropConsistency,
1003                    Unbounded,
1004                    TotalOrder,
1005                    ExactlyOnce,
1006                >::collection_kind(
1007                )),
1008            },
1009        );
1010
1011        (
1012            ExternalBytesPort {
1013                process_key: from.key,
1014                port_id: next_external_port_id,
1015                _phantom: PhantomData,
1016            },
1017            raw_stream
1018                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
1019                .into_keyed(),
1020            raw_membership_stream.map(q!(|join| {
1021                if join {
1022                    MembershipEvent::Joined
1023                } else {
1024                    MembershipEvent::Left
1025                }
1026            })),
1027            fwd_ref,
1028        )
1029    }
1030
1031    /// Establishes a server on this location to receive bidirectional connections from multiple
1032    /// external clients using bincode serialization.
1033    ///
1034    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
1035    /// client connections. Each client is assigned a unique `u64` identifier.
1036    ///
1037    /// Returns:
1038    /// - A port handle for external processes to connect to
1039    /// - A keyed stream of incoming messages, keyed by client ID
1040    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
1041    /// - A handle to send outgoing messages, keyed by client ID
1042    ///
1043    /// # Type Parameters
1044    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
1045    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
1046    #[expect(clippy::type_complexity, reason = "stream markers")]
1047    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1048        &self,
1049        from: &External<L>,
1050    ) -> (
1051        ExternalBincodeBidi<InT, OutT, Many>,
1052        KeyedStream<u64, InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1053        KeyedStream<
1054            u64,
1055            MembershipEvent,
1056            Self::DropConsistency,
1057            Unbounded,
1058            TotalOrder,
1059            ExactlyOnce,
1060        >,
1061        ForwardHandle<
1062            'a,
1063            KeyedStream<u64, OutT, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
1064        >,
1065    )
1066    where
1067        Self: TopLevel<'a> + Sized,
1068    {
1069        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1070
1071        let target_consistency = self.drop_consistency();
1072        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1073            u64,
1074            OutT,
1075            Self::DropConsistency,
1076            Unbounded,
1077            NoOrder,
1078            ExactlyOnce,
1079        >>();
1080        let mut flow_state_borrow = self.flow_state().borrow_mut();
1081
1082        let root = get_this_crate();
1083
1084        let out_t_type = quote_type::<OutT>();
1085        let ser_fn: syn::Expr = syn::parse_quote! {
1086            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1087                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1088            )
1089        };
1090
1091        flow_state_borrow.push_root(HydroRoot::SendExternal {
1092            to_external_key: from.key,
1093            to_port_id: next_external_port_id,
1094            to_many: true,
1095            unpaired: false,
1096            serialize_fn: Some(ser_fn.into()),
1097            instantiate_fn: DebugInstantiate::Building,
1098            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1099            op_metadata: HydroIrOpMetadata::new(),
1100        });
1101
1102        let in_t_type = quote_type::<InT>();
1103
1104        let deser_fn: syn::Expr = syn::parse_quote! {
1105            |res| {
1106                let (id, b) = res.unwrap();
1107                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1108            }
1109        };
1110
1111        let raw_stream: KeyedStream<
1112            u64,
1113            InT,
1114            Self::DropConsistency,
1115            Unbounded,
1116            TotalOrder,
1117            ExactlyOnce,
1118        > = KeyedStream::new(
1119            target_consistency.clone(),
1120            HydroNode::ExternalInput {
1121                from_external_key: from.key,
1122                from_port_id: next_external_port_id,
1123                from_many: true,
1124                codec_type: quote_type::<LengthDelimitedCodec>().into(),
1125                port_hint: NetworkHint::Auto,
1126                instantiate_fn: DebugInstantiate::Building,
1127                deserialize_fn: Some(deser_fn.into()),
1128                metadata: target_consistency.new_node_metadata(KeyedStream::<
1129                    u64,
1130                    InT,
1131                    Self::DropConsistency,
1132                    Unbounded,
1133                    TotalOrder,
1134                    ExactlyOnce,
1135                >::collection_kind(
1136                )),
1137            },
1138        );
1139
1140        let membership_stream_ident = syn::Ident::new(
1141            &format!(
1142                "__hydro_deploy_many_{}_{}_membership",
1143                from.key, next_external_port_id
1144            ),
1145            Span::call_site(),
1146        );
1147        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1148        let raw_membership_stream: KeyedStream<
1149            u64,
1150            bool,
1151            Self::DropConsistency,
1152            Unbounded,
1153            TotalOrder,
1154            ExactlyOnce,
1155        > = KeyedStream::new(
1156            target_consistency.clone(),
1157            HydroNode::Source {
1158                source: HydroSource::Stream(membership_stream_expr.into()),
1159                metadata: target_consistency.new_node_metadata(KeyedStream::<
1160                    u64,
1161                    bool,
1162                    Self::DropConsistency,
1163                    Unbounded,
1164                    TotalOrder,
1165                    ExactlyOnce,
1166                >::collection_kind(
1167                )),
1168            },
1169        );
1170
1171        (
1172            ExternalBincodeBidi {
1173                process_key: from.key,
1174                port_id: next_external_port_id,
1175                _phantom: PhantomData,
1176            },
1177            raw_stream,
1178            raw_membership_stream.map(q!(|join| {
1179                if join {
1180                    MembershipEvent::Joined
1181                } else {
1182                    MembershipEvent::Left
1183                }
1184            })),
1185            fwd_ref,
1186        )
1187    }
1188
1189    /// Bridges user-owned async code to the dataflow as a **bidirectional sidecar**.
1190    ///
1191    /// The closure is called once at startup and must return a
1192    /// `(Stream<InT>, Sink<OutT>)` pair. The framework reads from the stream
1193    /// (items flowing *into* the dataflow) and writes to the sink (items flowing
1194    /// *out* to the sidecar). The user controls buffering, backpressure, and
1195    /// internal lifecycle — Hydro only sees the stream/sink interface.
1196    ///
1197    /// This will hopefully make it easy to integrate hydro with existing frameworks,
1198    /// for example grpc code generated service endpoints.
1199    ///
1200    /// # Returns
1201    /// - A `Stream<InT>` carrying items from the sidecar into the dataflow.
1202    /// - A [`ForwardHandle`] expecting a `Stream<OutT>` that the user completes
1203    ///   with items destined for the sidecar.
1204    ///
1205    /// # Example
1206    ///
1207    /// ```rust
1208    /// # #[cfg(feature = "deploy")] {
1209    /// # use hydro_lang::prelude::*;
1210    /// # use futures::StreamExt;
1211    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1212    /// // Sidecar that echoes whatever it receives back into the dataflow.
1213    /// let (inbound, response_handle) = process.sidecar_bidi::<String, String, _>(q!(|| {
1214    ///     let (to_df_tx, to_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1215    ///     let (from_df_tx, mut from_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1216    ///
1217    ///     // Spawn the sidecar: echoes items from the dataflow back into it.
1218    ///     tokio::spawn(async move {
1219    ///         while let Some(msg) = from_df_rx.recv().await {
1220    ///             to_df_tx.send(msg).await.ok();
1221    ///         }
1222    ///     });
1223    ///
1224    ///     // Return the framework-facing ends (concrete types, no boxing needed).
1225    ///     let stream = tokio_stream::wrappers::ReceiverStream::new(to_df_rx);
1226    ///     let sink = tokio_util::sync::PollSender::new(from_df_tx);
1227    ///     (stream, sink)
1228    /// }));
1229    ///
1230    /// // Send "hello" into the sidecar via the response channel.
1231    /// let input = process.source_stream(q!(futures::stream::iter(vec!["hello".to_string()])));
1232    /// response_handle.complete(input);
1233    ///
1234    /// // The sidecar echoes it back — assert we get "hello" out.
1235    /// inbound
1236    /// # }, |mut stream| async move {
1237    /// #     assert_eq!(stream.next().await.unwrap(), "hello");
1238    /// # }));
1239    /// # }
1240    /// ```
1241    fn sidecar_bidi<InT: 'static, OutT: 'static, F>(
1242        &self,
1243        sidecar: impl QuotedWithContext<'a, F, Self>,
1244    ) -> (
1245        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
1246        ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1247    )
1248    where
1249        Self: Sized + TopLevel<'a>,
1250    {
1251        let location_key = Location::id(self).key();
1252
1253        let sidecar_id = self.flow_state().borrow_mut().next_sidecar_id();
1254        let (stream_ident, sink_ident) = sidecar_id.idents();
1255
1256        let sidecar_closure: syn::Expr = sidecar.splice_untyped_ctx(self);
1257        self.flow_state()
1258            .borrow_mut()
1259            .sidecars
1260            .push(crate::compile::builder::Sidecar::Bidi {
1261                location_key,
1262                sidecar_id,
1263                sidecar_closure: Box::new(sidecar_closure),
1264            });
1265
1266        // Inbound stream: reads from the stream returned by the sidecar closure
1267        let source_expr: syn::Expr = parse_quote! {
1268            #stream_ident
1269        };
1270        let inbound: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
1271            self.clone(),
1272            HydroNode::Source {
1273                source: HydroSource::Stream(source_expr.into()),
1274                metadata: self.new_node_metadata(Stream::<
1275                    InT,
1276                    Self,
1277                    Unbounded,  // TODO: maybe bounded sidecars are interesting..?
1278                    TotalOrder, // TODO: NoOrder..?
1279                    ExactlyOnce,
1280                >::collection_kind()),
1281            },
1282        );
1283
1284        // Outbound: forward_ref cycle feeding the sink returned by the sidecar closure
1285        let (fwd_ref, to_sink): (
1286            ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1287            Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>,
1288        ) = self.forward_ref();
1289
1290        let sink_expr: syn::Expr = parse_quote! {
1291            #sink_ident
1292        };
1293
1294        let sink_input_ir = to_sink.ir_node.replace(HydroNode::Placeholder);
1295        self.flow_state()
1296            .borrow_mut()
1297            .try_push_root(HydroRoot::DestSink {
1298                sink: sink_expr.into(),
1299                input: Box::new(sink_input_ir),
1300                op_metadata: HydroIrOpMetadata::new(),
1301            });
1302
1303        (inbound, fwd_ref)
1304    }
1305
1306    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1307    ///
1308    /// See also: [`Tick::singleton`], for creating a singleton _within_ a tick, which requires
1309    /// `T: Clone`.
1310    ///
1311    /// # Example
1312    /// ```rust
1313    /// # #[cfg(feature = "deploy")] {
1314    /// # use hydro_lang::prelude::*;
1315    /// # use futures::StreamExt;
1316    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1317    /// let singleton = process.singleton(q!(5));
1318    /// # singleton.into_stream()
1319    /// # }, |mut stream| async move {
1320    /// // 5
1321    /// # assert_eq!(stream.next().await.unwrap(), 5);
1322    /// # }));
1323    /// # }
1324    /// ```
1325    fn singleton<T>(
1326        &self,
1327        e: impl QuotedWithContext<'a, T, Self>,
1328    ) -> Singleton<T, Self::DropConsistency, Bounded>
1329    where
1330        Self: Sized,
1331    {
1332        let e = e.splice_untyped_ctx(self);
1333
1334        let target_location = self.drop_consistency();
1335        Singleton::new(
1336            target_location.clone(),
1337            HydroNode::SingletonSource {
1338                value: e.into(),
1339                first_tick_only: false,
1340                metadata: target_location.new_node_metadata(Singleton::<
1341                    T,
1342                    Self::DropConsistency,
1343                    Bounded,
1344                >::collection_kind()),
1345            },
1346        )
1347    }
1348
1349    /// Constructs a [`Singleton`] by resolving an async [`Future`] to completion.
1350    ///
1351    /// This is a convenience method equivalent to
1352    /// `self.singleton(future_expr).resolve_future_blocking()`, which is a common
1353    /// pattern when initializing a singleton from an async computation.
1354    ///
1355    /// # Example
1356    /// ```rust
1357    /// # #[cfg(feature = "deploy")] {
1358    /// # use hydro_lang::prelude::*;
1359    /// # use futures::StreamExt;
1360    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1361    /// let singleton = process.singleton_future(q!(async { 42 }));
1362    /// singleton.into_stream()
1363    /// # }, |mut stream| async move {
1364    /// // 42
1365    /// # assert_eq!(stream.next().await.unwrap(), 42);
1366    /// # }));
1367    /// # }
1368    /// ```
1369    ///
1370    /// [`Future`]: std::future::Future
1371    fn singleton_future<F>(
1372        &self,
1373        e: impl QuotedWithContext<'a, F, Self>,
1374    ) -> Singleton<F::Output, Self::DropConsistency, Bounded>
1375    where
1376        F: Future,
1377        Self: Sized,
1378    {
1379        self.singleton(e).resolve_future_blocking()
1380    }
1381
1382    /// Generates a stream that emits `()` at a fixed interval.
1383    ///
1384    /// The first tick completes immediately. Missed ticks will be scheduled
1385    /// as soon as possible.
1386    ///
1387    /// Because this only emits `()`, the non-determinism of *when* events fire
1388    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1389    /// [`NonDet`] guard is required.
1390    fn source_interval(
1391        &self,
1392        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1393    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1394    where
1395        Self: TopLevel<'a> + Sized,
1396    {
1397        self.source_stream(q!(tokio_stream::StreamExt::map(
1398            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
1399            |_| ()
1400        )))
1401        .assert_has_consistency_of_trusted(
1402            manual_proof!(/** interval does not reveal timestamps */),
1403        )
1404    }
1405
1406    /// Generates a stream that emits `()` at a fixed interval, after an
1407    /// initial delay.
1408    ///
1409    /// Because this only emits `()`, the non-determinism of *when* events fire
1410    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1411    /// [`NonDet`] guard is required.
1412    fn source_interval_delayed(
1413        &self,
1414        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1415        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1416    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1417    where
1418        Self: TopLevel<'a> + Sized,
1419    {
1420        self.source_stream(q!(tokio_stream::StreamExt::map(
1421            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
1422                tokio::time::Instant::now() + delay,
1423                interval,
1424            )),
1425            |_| ()
1426        )))
1427        .assert_has_consistency_of_trusted(
1428            manual_proof!(/** interval does not reveal timestamps */),
1429        )
1430    }
1431
1432    /// Creates a forward reference, allowing a stream to be used before its source is defined.
1433    ///
1434    /// Returns a `(handle, placeholder)` pair. Use the placeholder in the dataflow graph,
1435    /// then call `handle.complete(actual_stream)` to wire in the real source.
1436    ///
1437    /// This is useful for mutually-dependent dataflows or when the definition order
1438    /// doesn't match the data flow direction. For feedback loops, prefer [`Tick::cycle`]
1439    /// instead, which automatically defers values by one tick.
1440    ///
1441    /// # Panics
1442    /// Panics if the forward reference creates a synchronous cycle (i.e., the completed
1443    /// stream transitively depends on the placeholder without a `defer_tick` or network
1444    /// hop in between).
1445    ///
1446    /// # Example
1447    /// ```rust
1448    /// # #[cfg(feature = "deploy")] {
1449    /// # use hydro_lang::prelude::*;
1450    /// # use hydro_lang::live_collections::stream::NoOrder;
1451    /// # use futures::StreamExt;
1452    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1453    /// // Create a forward reference to define a stream that will be completed later
1454    /// let (complete, forward_stream) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1455    ///
1456    /// // Use the forward reference as input to another computation
1457    /// let output: Stream<_, _, _, NoOrder> = forward_stream.map(q!(|x| x * 2));
1458    ///
1459    /// // Complete the forward reference with the actual source
1460    /// let source: Stream<_, _, Unbounded> = process.source_iter(q!([1, 2, 3])).into();
1461    /// complete.complete(source);
1462    /// output
1463    /// # }, |mut stream| async move {
1464    /// // 2, 4, 6
1465    /// # assert_eq!(stream.next().await.unwrap(), 2);
1466    /// # assert_eq!(stream.next().await.unwrap(), 4);
1467    /// # assert_eq!(stream.next().await.unwrap(), 6);
1468    /// # }));
1469    /// # }
1470    /// ```
1471    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1472    where
1473        S: CycleCollection<'a, ForwardRef, Location = Self>,
1474    {
1475        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1476        (
1477            ForwardHandle::new(cycle_id, Location::id(self)),
1478            S::create_source(cycle_id, self.clone()),
1479        )
1480    }
1481}
1482
1483#[cfg(feature = "deploy")]
1484#[cfg(test)]
1485mod tests {
1486    use std::collections::HashSet;
1487
1488    use futures::{SinkExt, StreamExt};
1489    use hydro_deploy::Deployment;
1490    use stageleft::q;
1491    use tokio_util::codec::LengthDelimitedCodec;
1492
1493    use crate::compile::builder::FlowBuilder;
1494    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1495    use crate::location::{Location, NetworkHint};
1496    use crate::nondet::nondet;
1497
1498    #[tokio::test]
1499    async fn top_level_singleton_replay_cardinality() {
1500        let mut deployment = Deployment::new();
1501
1502        let mut flow = FlowBuilder::new();
1503        let node = flow.process::<()>();
1504        let external = flow.external::<()>();
1505
1506        let (in_port, input) =
1507            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1508        let singleton = node.singleton(q!(123));
1509        let tick = node.tick();
1510        let out = input
1511            .batch(&tick, nondet!(/** test */))
1512            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1513            .cross_singleton(
1514                singleton
1515                    .snapshot(&tick, nondet!(/** test */))
1516                    .into_stream()
1517                    .count(),
1518            )
1519            .all_ticks()
1520            .send_bincode_external(&external);
1521
1522        let nodes = flow
1523            .with_process(&node, deployment.Localhost())
1524            .with_external(&external, deployment.Localhost())
1525            .deploy(&mut deployment);
1526
1527        deployment.deploy().await.unwrap();
1528
1529        let mut external_in = nodes.connect(in_port).await;
1530        let mut external_out = nodes.connect(out).await;
1531
1532        deployment.start().await.unwrap();
1533
1534        external_in.send(1).await.unwrap();
1535        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1536
1537        external_in.send(2).await.unwrap();
1538        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1539    }
1540
1541    #[tokio::test]
1542    async fn tick_singleton_replay_cardinality() {
1543        let mut deployment = Deployment::new();
1544
1545        let mut flow = FlowBuilder::new();
1546        let node = flow.process::<()>();
1547        let external = flow.external::<()>();
1548
1549        let (in_port, input) =
1550            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1551        let tick = node.tick();
1552        let singleton = tick.singleton(q!(123));
1553        let out = input
1554            .batch(&tick, nondet!(/** test */))
1555            .cross_singleton(singleton.clone())
1556            .cross_singleton(singleton.into_stream().count())
1557            .all_ticks()
1558            .send_bincode_external(&external);
1559
1560        let nodes = flow
1561            .with_process(&node, deployment.Localhost())
1562            .with_external(&external, deployment.Localhost())
1563            .deploy(&mut deployment);
1564
1565        deployment.deploy().await.unwrap();
1566
1567        let mut external_in = nodes.connect(in_port).await;
1568        let mut external_out = nodes.connect(out).await;
1569
1570        deployment.start().await.unwrap();
1571
1572        external_in.send(1).await.unwrap();
1573        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1574
1575        external_in.send(2).await.unwrap();
1576        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1577    }
1578
1579    #[tokio::test]
1580    async fn external_bytes() {
1581        let mut deployment = Deployment::new();
1582
1583        let mut flow = FlowBuilder::new();
1584        let first_node = flow.process::<()>();
1585        let external = flow.external::<()>();
1586
1587        let (in_port, input) = first_node.source_external_bytes(&external);
1588        let out = input.send_bincode_external(&external);
1589
1590        let nodes = flow
1591            .with_process(&first_node, deployment.Localhost())
1592            .with_external(&external, deployment.Localhost())
1593            .deploy(&mut deployment);
1594
1595        deployment.deploy().await.unwrap();
1596
1597        let mut external_in = nodes.connect(in_port).await.1;
1598        let mut external_out = nodes.connect(out).await;
1599
1600        deployment.start().await.unwrap();
1601
1602        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1603
1604        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1605    }
1606
1607    #[tokio::test]
1608    async fn multi_external_source() {
1609        let mut deployment = Deployment::new();
1610
1611        let mut flow = FlowBuilder::new();
1612        let first_node = flow.process::<()>();
1613        let external = flow.external::<()>();
1614
1615        let (in_port, input, _membership, complete_sink) =
1616            first_node.bidi_external_many_bincode(&external);
1617        let out = input.entries().send_bincode_external(&external);
1618        complete_sink.complete(
1619            first_node
1620                .source_iter::<(u64, ()), _>(q!([]))
1621                .into_keyed()
1622                .weaken_ordering(),
1623        );
1624
1625        let nodes = flow
1626            .with_process(&first_node, deployment.Localhost())
1627            .with_external(&external, deployment.Localhost())
1628            .deploy(&mut deployment);
1629
1630        deployment.deploy().await.unwrap();
1631
1632        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1633        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1634        let external_out = nodes.connect(out).await;
1635
1636        deployment.start().await.unwrap();
1637
1638        external_in_1.send(123).await.unwrap();
1639        external_in_2.send(456).await.unwrap();
1640
1641        assert_eq!(
1642            external_out.take(2).collect::<HashSet<_>>().await,
1643            vec![(0, 123), (1, 456)].into_iter().collect()
1644        );
1645    }
1646
1647    #[tokio::test]
1648    async fn second_connection_only_multi_source() {
1649        let mut deployment = Deployment::new();
1650
1651        let mut flow = FlowBuilder::new();
1652        let first_node = flow.process::<()>();
1653        let external = flow.external::<()>();
1654
1655        let (in_port, input, _membership, complete_sink) =
1656            first_node.bidi_external_many_bincode(&external);
1657        let out = input.entries().send_bincode_external(&external);
1658        complete_sink.complete(
1659            first_node
1660                .source_iter::<(u64, ()), _>(q!([]))
1661                .into_keyed()
1662                .weaken_ordering(),
1663        );
1664
1665        let nodes = flow
1666            .with_process(&first_node, deployment.Localhost())
1667            .with_external(&external, deployment.Localhost())
1668            .deploy(&mut deployment);
1669
1670        deployment.deploy().await.unwrap();
1671
1672        // intentionally skipped to test stream waking logic
1673        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1674        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1675        let mut external_out = nodes.connect(out).await;
1676
1677        deployment.start().await.unwrap();
1678
1679        external_in_2.send(456).await.unwrap();
1680
1681        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1682    }
1683
1684    #[tokio::test]
1685    async fn multi_external_bytes() {
1686        let mut deployment = Deployment::new();
1687
1688        let mut flow = FlowBuilder::new();
1689        let first_node = flow.process::<()>();
1690        let external = flow.external::<()>();
1691
1692        let (in_port, input, _membership, complete_sink) = first_node
1693            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1694        let out = input.entries().send_bincode_external(&external);
1695        complete_sink.complete(
1696            first_node
1697                .source_iter(q!([]))
1698                .into_keyed()
1699                .weaken_ordering(),
1700        );
1701
1702        let nodes = flow
1703            .with_process(&first_node, deployment.Localhost())
1704            .with_external(&external, deployment.Localhost())
1705            .deploy(&mut deployment);
1706
1707        deployment.deploy().await.unwrap();
1708
1709        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1710        let mut external_in_2 = nodes.connect(in_port).await.1;
1711        let external_out = nodes.connect(out).await;
1712
1713        deployment.start().await.unwrap();
1714
1715        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1716        external_in_2.send(vec![4, 5].into()).await.unwrap();
1717
1718        assert_eq!(
1719            external_out.take(2).collect::<HashSet<_>>().await,
1720            vec![
1721                (0, (&[1u8, 2, 3] as &[u8]).into()),
1722                (1, (&[4u8, 5] as &[u8]).into())
1723            ]
1724            .into_iter()
1725            .collect()
1726        );
1727    }
1728
1729    #[tokio::test]
1730    async fn single_client_external_bytes() {
1731        let mut deployment = Deployment::new();
1732        let mut flow = FlowBuilder::new();
1733        let first_node = flow.process::<()>();
1734        let external = flow.external::<()>();
1735        let (port, input, complete_sink) = first_node
1736            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1737        complete_sink.complete(input.map(q!(|data| {
1738            let mut resp: Vec<u8> = data.into();
1739            resp.push(42);
1740            resp.into() // : Bytes
1741        })));
1742
1743        let nodes = flow
1744            .with_process(&first_node, deployment.Localhost())
1745            .with_external(&external, deployment.Localhost())
1746            .deploy(&mut deployment);
1747
1748        deployment.deploy().await.unwrap();
1749        deployment.start().await.unwrap();
1750
1751        let (mut external_out, mut external_in) = nodes.connect(port).await;
1752
1753        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1754        assert_eq!(
1755            external_out.next().await.unwrap().unwrap(),
1756            vec![1, 2, 3, 42]
1757        );
1758    }
1759
1760    #[tokio::test]
1761    async fn echo_external_bytes() {
1762        let mut deployment = Deployment::new();
1763
1764        let mut flow = FlowBuilder::new();
1765        let first_node = flow.process::<()>();
1766        let external = flow.external::<()>();
1767
1768        let (port, input, _membership, complete_sink) = first_node
1769            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1770        complete_sink
1771            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1772
1773        let nodes = flow
1774            .with_process(&first_node, deployment.Localhost())
1775            .with_external(&external, deployment.Localhost())
1776            .deploy(&mut deployment);
1777
1778        deployment.deploy().await.unwrap();
1779
1780        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1781        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1782
1783        deployment.start().await.unwrap();
1784
1785        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1786        external_in_2.send(vec![4, 5].into()).await.unwrap();
1787
1788        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1789        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1790    }
1791
1792    #[tokio::test]
1793    async fn echo_external_bincode() {
1794        let mut deployment = Deployment::new();
1795
1796        let mut flow = FlowBuilder::new();
1797        let first_node = flow.process::<()>();
1798        let external = flow.external::<()>();
1799
1800        let (port, input, _membership, complete_sink) =
1801            first_node.bidi_external_many_bincode(&external);
1802        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1803
1804        let nodes = flow
1805            .with_process(&first_node, deployment.Localhost())
1806            .with_external(&external, deployment.Localhost())
1807            .deploy(&mut deployment);
1808
1809        deployment.deploy().await.unwrap();
1810
1811        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1812        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1813
1814        deployment.start().await.unwrap();
1815
1816        external_in_1.send("hi".to_owned()).await.unwrap();
1817        external_in_2.send("hello".to_owned()).await.unwrap();
1818
1819        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1820        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1821    }
1822
1823    #[tokio::test]
1824    async fn closure_location_name() {
1825        let mut deployment = Deployment::new();
1826        let mut flow = FlowBuilder::new();
1827
1828        enum ClosureProcess {}
1829
1830        let node = flow.process::<ClosureProcess>();
1831        let external = flow.external::<()>();
1832
1833        let (in_port, input) =
1834            node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1835        let out = input.send_bincode_external(&external);
1836
1837        let nodes = flow
1838            .with_process(&node, deployment.Localhost())
1839            .with_external(&external, deployment.Localhost())
1840            .deploy(&mut deployment);
1841
1842        deployment.deploy().await.unwrap();
1843
1844        let mut external_in = nodes.connect(in_port).await;
1845        let mut external_out = nodes.connect(out).await;
1846
1847        deployment.start().await.unwrap();
1848
1849        external_in.send(42).await.unwrap();
1850        assert_eq!(external_out.next().await.unwrap(), 42);
1851    }
1852}