Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub(crate) expr: DebugExpr,
46    /// Each entry is `(HydroNode::Reference, is_mut: bool)`.
47    /// The index in the Vec determines the ident name via [`handoff_ref_ident`].
48    /// The `access_counter` was assigned at staging time in code order.
49    pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53    fn clone(&self) -> Self {
54        Self {
55            expr: self.expr.clone(),
56            singleton_refs: self
57                .singleton_refs
58                .iter()
59                .map(|(node, is_mut)| {
60                    let HydroNode::Reference {
61                        inner,
62                        kind,
63                        access_counter,
64                        metadata,
65                    } = node
66                    else {
67                        panic!("singleton_refs should only contain HydroNode::Reference");
68                    };
69                    (
70                        HydroNode::Reference {
71                            inner: SharedNode(Rc::clone(&inner.0)),
72                            kind: *kind,
73                            access_counter: access_counter.freeze(),
74                            metadata: metadata.clone(),
75                        },
76                        *is_mut,
77                    )
78                })
79                .collect(),
80        }
81    }
82}
83
84impl Hash for ClosureExpr {
85    fn hash<H: Hasher>(&self, state: &mut H) {
86        self.expr.hash(state);
87        // singleton_refs are structural children (like HydroIrMetadata), not
88        // identity-defining. Two closures with the same expr but different
89        // captured refs are the same closure text — the refs only affect codegen.
90    }
91}
92
93impl serde::Serialize for ClosureExpr {
94    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95        use serde::ser::SerializeStruct;
96        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97        s.serialize_field("expr", &self.expr)?;
98        s.serialize_field(
99            "singleton_refs",
100            &SerializableSingletonRefs(&self.singleton_refs),
101        )?;
102        s.end()
103    }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110        use serde::ser::SerializeSeq;
111        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112        for (node, is_mut) in self.0.iter() {
113            seq.serialize_element(&(node, is_mut))?;
114        }
115        seq.end()
116    }
117}
118
119impl Debug for ClosureExpr {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        Debug::fmt(&self.expr, f)
122    }
123}
124
125impl Display for ClosureExpr {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        Display::fmt(&self.expr, f)
128    }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132    fn from(expr: syn::Expr) -> Self {
133        Self {
134            expr: DebugExpr(Box::new(expr)),
135            singleton_refs: Vec::new(),
136        }
137    }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141    fn from(expr: DebugExpr) -> Self {
142        Self {
143            expr,
144            singleton_refs: Vec::new(),
145        }
146    }
147}
148
149impl ClosureExpr {
150    pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151        Self {
152            expr,
153            singleton_refs,
154        }
155    }
156
157    pub fn has_mut_ref(&self) -> bool {
158        self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159    }
160
161    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162        Self {
163            expr: self.expr.clone(),
164            singleton_refs: self
165                .singleton_refs
166                .iter()
167                .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168                .collect(),
169        }
170    }
171
172    pub fn transform_children(
173        &mut self,
174        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175        seen_tees: &mut SeenSharedNodes,
176    ) {
177        for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178            transform(ref_node, seen_tees);
179        }
180    }
181
182    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
183    /// replacing local singleton ref idents with `#{N} dfir_ident` or `#{N} mut dfir_ident` references.
184    #[cfg(feature = "build")]
185    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186        if self.singleton_refs.is_empty() {
187            self.expr.0.to_token_stream()
188        } else {
189            assert!(
190                ident_stack.len() >= self.singleton_refs.len(),
191                "ident_stack has {} entries but expected at least {} for singleton_refs",
192                ident_stack.len(),
193                self.singleton_refs.len()
194            );
195            let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197            let mut let_bindings = Vec::new();
198            for ((i, (ref_node, is_mut)), ref_ident) in
199                self.singleton_refs.iter().enumerate().zip(ref_idents)
200            {
201                let HydroNode::Reference { access_counter, .. } = ref_node else {
202                    panic!("ClosureExpression expected references to `HydroNode::Reference`");
203                };
204                let group = access_counter.frozen_group();
205                // TODO(mingwei): proper spanning?
206                let local_ident = handoff_ref_ident(i);
207                let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208                let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209                let mut_token = is_mut.then(|| quote!(mut));
210                let binding = quote! {
211                    let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212                };
213                let_bindings.push(binding);
214            }
215
216            let expr = &self.expr.0;
217            quote! {
218                {
219                    #( #let_bindings )*
220                    #expr
221                }
222            }
223        }
224    }
225}
226
227/// Wrapper that displays only the tokens of a parsed expr.
228///
229/// Boxes `syn::Type` which is ~240 bytes.
230#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235        serializer.serialize_str(&self.to_string())
236    }
237}
238
239impl From<syn::Expr> for DebugExpr {
240    fn from(expr: syn::Expr) -> Self {
241        Self(Box::new(expr))
242    }
243}
244
245impl Deref for DebugExpr {
246    type Target = syn::Expr;
247
248    fn deref(&self) -> &Self::Target {
249        &self.0
250    }
251}
252
253impl ToTokens for DebugExpr {
254    fn to_tokens(&self, tokens: &mut TokenStream) {
255        self.0.to_tokens(tokens);
256    }
257}
258
259impl Debug for DebugExpr {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        write!(f, "{}", self.0.to_token_stream())
262    }
263}
264
265impl Display for DebugExpr {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        let original = self.0.as_ref().clone();
268        let simplified = simplify_q_macro(original);
269
270        // For now, just use quote formatting without trying to parse as a statement
271        // This avoids the syn::parse_quote! issues entirely
272        write!(f, "q!({})", quote::quote!(#simplified))
273    }
274}
275
276/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
277fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278    if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279        // Look for calls to stageleft::runtime_support::fn*
280        && is_stageleft_runtime_support_call(&path_expr.path)
281        && let syn::Expr::Block(b) = &call.args[0]
282        && b.block.stmts.len() == 3
283        && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284    // skip the first two, which are imports
285    {
286        let mut e = e.clone();
287        while let syn::Expr::Block(ref mut block) = e
288            && block.block.stmts.len() == 1
289            && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290        {
291            e = inner_e;
292        }
293
294        e
295    } else {
296        expr
297    }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301    // Check if this is a call to stageleft::runtime_support::fn*
302    if let Some(last_segment) = path.segments.last() {
303        let fn_name = last_segment.ident.to_string();
304        path.segments.len() > 2
305            && path.segments[0].ident == "stageleft"
306            && path.segments[1].ident == "runtime_support"
307            && fn_name.contains("_type_hint")
308    } else {
309        false
310    }
311}
312
313/// Debug displays the type's tokens.
314///
315/// Boxes `syn::Type` which is ~320 bytes.
316#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320    fn from(t: syn::Type) -> Self {
321        Self(Box::new(t))
322    }
323}
324
325impl Deref for DebugType {
326    type Target = syn::Type;
327
328    fn deref(&self) -> &Self::Target {
329        &self.0
330    }
331}
332
333impl ToTokens for DebugType {
334    fn to_tokens(&self, tokens: &mut TokenStream) {
335        self.0.to_tokens(tokens);
336    }
337}
338
339impl Debug for DebugType {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        write!(f, "{}", self.0.to_token_stream())
342    }
343}
344
345impl serde::Serialize for DebugType {
346    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348    }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352    backtrace: &Backtrace,
353    serializer: S,
354) -> Result<S::Ok, S::Error> {
355    match backtrace.format_span() {
356        Some(span) => serializer.serialize_some(&span),
357        None => serializer.serialize_none(),
358    }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362    ident: &syn::Ident,
363    serializer: S,
364) -> Result<S::Ok, S::Error> {
365    serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369    Building,
370    Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375        match self {
376            DebugInstantiate::Building => {
377                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378            }
379            DebugInstantiate::Finalized(_) => {
380                panic!(
381                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382                )
383            }
384        }
385    }
386}
387
388#[cfg_attr(
389    not(feature = "build"),
390    expect(
391        dead_code,
392        reason = "sink, source unused without `feature = \"build\"`."
393    )
394)]
395pub struct DebugInstantiateFinalized {
396    sink: syn::Expr,
397    source: syn::Expr,
398    connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402    fn from(f: DebugInstantiateFinalized) -> Self {
403        Self::Finalized(Box::new(f))
404    }
405}
406
407impl Debug for DebugInstantiate {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        write!(f, "<network instantiate>")
410    }
411}
412
413impl Hash for DebugInstantiate {
414    fn hash<H: Hasher>(&self, _state: &mut H) {
415        // Do nothing
416    }
417}
418
419impl Clone for DebugInstantiate {
420    fn clone(&self) -> Self {
421        match self {
422            DebugInstantiate::Building => DebugInstantiate::Building,
423            DebugInstantiate::Finalized(_) => {
424                panic!("DebugInstantiate::Finalized should not be cloned")
425            }
426        }
427    }
428}
429
430/// Tracks the instantiation state of a `ClusterMembers` source.
431///
432/// During `compile_network`, the first `ClusterMembers` node for a given
433/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
434/// receives the expression returned by `Deploy::cluster_membership_stream`.
435/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
436/// during code-gen they simply reference the tee output of the first node
437/// instead of creating a redundant `source_stream`.
438#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440    /// Not yet instantiated.
441    Uninit,
442    /// The primary instance: holds the stream expression and will emit
443    /// `source_stream(expr) -> tee()` during code-gen.
444    Stream(DebugExpr),
445    /// A secondary instance that references the tee output of the primary.
446    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
447    /// can derive the deterministic tee ident without extra state.
448    Tee(LocationId, LocationId),
449}
450
451/// A source in a Hydro graph, where data enters the graph.
452#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454    Stream(DebugExpr),
455    ExternalNetwork(),
456    Iter(DebugExpr),
457    Spin(),
458    ClusterMembers(LocationId, ClusterMembersState),
459    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
465/// and simulations.
466///
467/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
468/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
469pub trait DfirBuilder {
470    /// Whether the representation of singletons should include intermediate states.
471    fn singleton_intermediates(&self) -> bool;
472
473    /// Gets the DFIR builder for the given location, creating it if necessary.
474    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476    #[expect(clippy::too_many_arguments, reason = "TODO")]
477    fn batch(
478        &mut self,
479        in_ident: syn::Ident,
480        in_location: &LocationId,
481        in_kind: &CollectionKind,
482        out_ident: &syn::Ident,
483        out_location: &LocationId,
484        op_meta: &HydroIrOpMetadata,
485        fold_hooked_idents: &HashSet<String>,
486    );
487    fn yield_from_tick(
488        &mut self,
489        in_ident: syn::Ident,
490        in_location: &LocationId,
491        in_kind: &CollectionKind,
492        out_ident: &syn::Ident,
493        out_location: &LocationId,
494    );
495
496    fn begin_atomic(
497        &mut self,
498        in_ident: syn::Ident,
499        in_location: &LocationId,
500        in_kind: &CollectionKind,
501        out_ident: &syn::Ident,
502        out_location: &LocationId,
503        op_meta: &HydroIrOpMetadata,
504    );
505    fn end_atomic(
506        &mut self,
507        in_ident: syn::Ident,
508        in_location: &LocationId,
509        in_kind: &CollectionKind,
510        out_ident: &syn::Ident,
511    );
512
513    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514    fn observe_nondet(
515        &mut self,
516        trusted: bool,
517        location: &LocationId,
518        in_ident: syn::Ident,
519        in_kind: &CollectionKind,
520        out_ident: &syn::Ident,
521        out_kind: &CollectionKind,
522        op_meta: &HydroIrOpMetadata,
523    );
524
525    #[expect(clippy::too_many_arguments, reason = "TODO")]
526    fn merge_ordered(
527        &mut self,
528        location: &LocationId,
529        first_ident: syn::Ident,
530        second_ident: syn::Ident,
531        out_ident: &syn::Ident,
532        in_kind: &CollectionKind,
533        op_meta: &HydroIrOpMetadata,
534        operator_tag: Option<&str>,
535    );
536
537    #[expect(clippy::too_many_arguments, reason = "TODO")]
538    fn create_network(
539        &mut self,
540        from: &LocationId,
541        to: &LocationId,
542        input_ident: syn::Ident,
543        out_ident: &syn::Ident,
544        serialize: Option<&DebugExpr>,
545        sink: syn::Expr,
546        source: syn::Expr,
547        deserialize: Option<&DebugExpr>,
548        tag_id: StmtId,
549        networking_info: &crate::networking::NetworkingInfo,
550    );
551
552    fn create_external_source(
553        &mut self,
554        on: &LocationId,
555        source_expr: syn::Expr,
556        out_ident: &syn::Ident,
557        deserialize: Option<&DebugExpr>,
558        tag_id: StmtId,
559    );
560
561    fn create_external_output(
562        &mut self,
563        on: &LocationId,
564        sink_expr: syn::Expr,
565        input_ident: &syn::Ident,
566        serialize: Option<&DebugExpr>,
567        tag_id: StmtId,
568    );
569
570    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
571    /// Returns the new input ident to use for the fold if a hook was emitted.
572    fn emit_fold_hook(
573        &mut self,
574        location: &LocationId,
575        in_ident: &syn::Ident,
576        in_kind: &CollectionKind,
577        op_meta: &HydroIrOpMetadata,
578    ) -> Option<syn::Ident>;
579
580    /// Inserts necessary code to validate a manual assertion that at this point the
581    /// input live collection is consistent. In production, this is a no-op, but in simulation
582    /// this will (not yet implemented) inject assertions that validate consistency.
583    fn assert_is_consistent(
584        &mut self,
585        trusted: bool,
586        location: &LocationId,
587        in_ident: syn::Ident,
588        out_ident: &syn::Ident,
589    );
590
591    /// Observes non-determinism introduced by a mut closure operating on a non-strict
592    /// (unordered / at-least-once) input. In production this is identity; in simulation
593    /// it delegates to `observe_nondet` with the strict output kind.
594    fn observe_for_mut(
595        &mut self,
596        location: &LocationId,
597        in_ident: syn::Ident,
598        in_kind: &CollectionKind,
599        out_ident: &syn::Ident,
600        op_meta: &HydroIrOpMetadata,
601    );
602}
603
604#[cfg(feature = "build")]
605impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
606    fn singleton_intermediates(&self) -> bool {
607        false
608    }
609
610    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
611        self.entry(location.root().key())
612            .expect("location was removed")
613            .or_default()
614    }
615
616    fn batch(
617        &mut self,
618        in_ident: syn::Ident,
619        in_location: &LocationId,
620        in_kind: &CollectionKind,
621        out_ident: &syn::Ident,
622        _out_location: &LocationId,
623        _op_meta: &HydroIrOpMetadata,
624        _fold_hooked_idents: &HashSet<String>,
625    ) {
626        let builder = self.get_dfir_mut(in_location.root());
627        if in_kind.is_bounded()
628            && matches!(
629                in_kind,
630                CollectionKind::Singleton { .. }
631                    | CollectionKind::Optional { .. }
632                    | CollectionKind::KeyedSingleton { .. }
633            )
634        {
635            assert!(in_location.is_top_level());
636            builder.add_dfir(
637                parse_quote! {
638                    #out_ident = #in_ident -> persist::<'static>();
639                },
640                None,
641                None,
642            );
643        } else {
644            builder.add_dfir(
645                parse_quote! {
646                    #out_ident = #in_ident;
647                },
648                None,
649                None,
650            );
651        }
652    }
653
654    fn yield_from_tick(
655        &mut self,
656        in_ident: syn::Ident,
657        in_location: &LocationId,
658        _in_kind: &CollectionKind,
659        out_ident: &syn::Ident,
660        _out_location: &LocationId,
661    ) {
662        let builder = self.get_dfir_mut(in_location.root());
663        builder.add_dfir(
664            parse_quote! {
665                #out_ident = #in_ident;
666            },
667            None,
668            None,
669        );
670    }
671
672    fn begin_atomic(
673        &mut self,
674        in_ident: syn::Ident,
675        in_location: &LocationId,
676        _in_kind: &CollectionKind,
677        out_ident: &syn::Ident,
678        _out_location: &LocationId,
679        _op_meta: &HydroIrOpMetadata,
680    ) {
681        let builder = self.get_dfir_mut(in_location.root());
682        builder.add_dfir(
683            parse_quote! {
684                #out_ident = #in_ident;
685            },
686            None,
687            None,
688        );
689    }
690
691    fn end_atomic(
692        &mut self,
693        in_ident: syn::Ident,
694        in_location: &LocationId,
695        _in_kind: &CollectionKind,
696        out_ident: &syn::Ident,
697    ) {
698        let builder = self.get_dfir_mut(in_location.root());
699        builder.add_dfir(
700            parse_quote! {
701                #out_ident = #in_ident;
702            },
703            None,
704            None,
705        );
706    }
707
708    fn observe_nondet(
709        &mut self,
710        _trusted: bool,
711        location: &LocationId,
712        in_ident: syn::Ident,
713        _in_kind: &CollectionKind,
714        out_ident: &syn::Ident,
715        _out_kind: &CollectionKind,
716        _op_meta: &HydroIrOpMetadata,
717    ) {
718        let builder = self.get_dfir_mut(location);
719        builder.add_dfir(
720            parse_quote! {
721                #out_ident = #in_ident;
722            },
723            None,
724            None,
725        );
726    }
727
728    fn merge_ordered(
729        &mut self,
730        location: &LocationId,
731        first_ident: syn::Ident,
732        second_ident: syn::Ident,
733        out_ident: &syn::Ident,
734        _in_kind: &CollectionKind,
735        _op_meta: &HydroIrOpMetadata,
736        operator_tag: Option<&str>,
737    ) {
738        let builder = self.get_dfir_mut(location);
739        builder.add_dfir(
740            parse_quote! {
741                #out_ident = union();
742                #first_ident -> [0]#out_ident;
743                #second_ident -> [1]#out_ident;
744            },
745            None,
746            operator_tag,
747        );
748    }
749
750    fn create_network(
751        &mut self,
752        from: &LocationId,
753        to: &LocationId,
754        input_ident: syn::Ident,
755        out_ident: &syn::Ident,
756        serialize: Option<&DebugExpr>,
757        sink: syn::Expr,
758        source: syn::Expr,
759        deserialize: Option<&DebugExpr>,
760        tag_id: StmtId,
761        _networking_info: &crate::networking::NetworkingInfo,
762    ) {
763        let sender_builder = self.get_dfir_mut(from);
764        if let Some(serialize_pipeline) = serialize {
765            sender_builder.add_dfir(
766                parse_quote! {
767                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
768                },
769                None,
770                // operator tag separates send and receive, which otherwise have the same next_stmt_id
771                Some(&format!("send{}", tag_id)),
772            );
773        } else {
774            sender_builder.add_dfir(
775                parse_quote! {
776                    #input_ident -> dest_sink(#sink);
777                },
778                None,
779                Some(&format!("send{}", tag_id)),
780            );
781        }
782
783        let receiver_builder = self.get_dfir_mut(to);
784        if let Some(deserialize_pipeline) = deserialize {
785            receiver_builder.add_dfir(
786                parse_quote! {
787                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
788                },
789                None,
790                Some(&format!("recv{}", tag_id)),
791            );
792        } else {
793            receiver_builder.add_dfir(
794                parse_quote! {
795                    #out_ident = source_stream(#source);
796                },
797                None,
798                Some(&format!("recv{}", tag_id)),
799            );
800        }
801    }
802
803    fn create_external_source(
804        &mut self,
805        on: &LocationId,
806        source_expr: syn::Expr,
807        out_ident: &syn::Ident,
808        deserialize: Option<&DebugExpr>,
809        tag_id: StmtId,
810    ) {
811        let receiver_builder = self.get_dfir_mut(on);
812        if let Some(deserialize_pipeline) = deserialize {
813            receiver_builder.add_dfir(
814                parse_quote! {
815                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
816                },
817                None,
818                Some(&format!("recv{}", tag_id)),
819            );
820        } else {
821            receiver_builder.add_dfir(
822                parse_quote! {
823                    #out_ident = source_stream(#source_expr);
824                },
825                None,
826                Some(&format!("recv{}", tag_id)),
827            );
828        }
829    }
830
831    fn create_external_output(
832        &mut self,
833        on: &LocationId,
834        sink_expr: syn::Expr,
835        input_ident: &syn::Ident,
836        serialize: Option<&DebugExpr>,
837        tag_id: StmtId,
838    ) {
839        let sender_builder = self.get_dfir_mut(on);
840        if let Some(serialize_fn) = serialize {
841            sender_builder.add_dfir(
842                parse_quote! {
843                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
844                },
845                None,
846                // operator tag separates send and receive, which otherwise have the same next_stmt_id
847                Some(&format!("send{}", tag_id)),
848            );
849        } else {
850            sender_builder.add_dfir(
851                parse_quote! {
852                    #input_ident -> dest_sink(#sink_expr);
853                },
854                None,
855                Some(&format!("send{}", tag_id)),
856            );
857        }
858    }
859
860    fn emit_fold_hook(
861        &mut self,
862        _location: &LocationId,
863        _in_ident: &syn::Ident,
864        _in_kind: &CollectionKind,
865        _op_meta: &HydroIrOpMetadata,
866    ) -> Option<syn::Ident> {
867        None
868    }
869
870    fn assert_is_consistent(
871        &mut self,
872        _trusted: bool,
873        location: &LocationId,
874        in_ident: syn::Ident,
875        out_ident: &syn::Ident,
876    ) {
877        let builder = self.get_dfir_mut(location);
878        builder.add_dfir(
879            parse_quote! {
880                #out_ident = #in_ident;
881            },
882            None,
883            None,
884        );
885    }
886
887    fn observe_for_mut(
888        &mut self,
889        location: &LocationId,
890        in_ident: syn::Ident,
891        _in_kind: &CollectionKind,
892        out_ident: &syn::Ident,
893        _op_meta: &HydroIrOpMetadata,
894    ) {
895        let builder = self.get_dfir_mut(location);
896        builder.add_dfir(
897            parse_quote! {
898                #out_ident = #in_ident;
899            },
900            None,
901            None,
902        );
903    }
904}
905
906#[cfg(feature = "build")]
907pub enum BuildersOrCallback<'a, L, N>
908where
909    L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
910    N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
911{
912    Builders(&'a mut dyn DfirBuilder),
913    Callback(L, N),
914}
915
916/// An root in a Hydro graph, which is an pipeline that doesn't emit
917/// any downstream values. Traversals over the dataflow graph and
918/// generating DFIR IR start from roots.
919#[derive(Debug, Hash, serde::Serialize)]
920pub enum HydroRoot {
921    ForEach {
922        f: ClosureExpr,
923        input: Box<HydroNode>,
924        op_metadata: HydroIrOpMetadata,
925    },
926    SendExternal {
927        to_external_key: LocationKey,
928        to_port_id: ExternalPortId,
929        to_many: bool,
930        unpaired: bool,
931        serialize_fn: Option<DebugExpr>,
932        instantiate_fn: DebugInstantiate,
933        input: Box<HydroNode>,
934        op_metadata: HydroIrOpMetadata,
935    },
936    DestSink {
937        sink: DebugExpr,
938        input: Box<HydroNode>,
939        op_metadata: HydroIrOpMetadata,
940    },
941    CycleSink {
942        cycle_id: CycleId,
943        input: Box<HydroNode>,
944        op_metadata: HydroIrOpMetadata,
945    },
946    EmbeddedOutput {
947        #[serde(serialize_with = "serialize_ident")]
948        ident: syn::Ident,
949        input: Box<HydroNode>,
950        op_metadata: HydroIrOpMetadata,
951    },
952    Null {
953        input: Box<HydroNode>,
954        op_metadata: HydroIrOpMetadata,
955    },
956}
957
958impl HydroRoot {
959    #[cfg(feature = "build")]
960    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
961    pub fn compile_network<'a, D>(
962        &mut self,
963        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
964        seen_tees: &mut SeenSharedNodes,
965        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
966        processes: &SparseSecondaryMap<LocationKey, D::Process>,
967        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
968        externals: &SparseSecondaryMap<LocationKey, D::External>,
969        env: &mut D::InstantiateEnv,
970    ) where
971        D: Deploy<'a>,
972    {
973        let refcell_extra_stmts = RefCell::new(extra_stmts);
974        let refcell_env = RefCell::new(env);
975        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
976        self.transform_bottom_up(
977            &mut |l| {
978                if let HydroRoot::SendExternal {
979                    input,
980                    to_external_key,
981                    to_port_id,
982                    to_many,
983                    unpaired,
984                    instantiate_fn,
985                    ..
986                } = l
987                {
988                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
989                        DebugInstantiate::Building => {
990                            let to_node = externals
991                                .get(*to_external_key)
992                                .unwrap_or_else(|| {
993                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
994                                })
995                                .clone();
996
997                            match input.metadata().location_id.root() {
998                                &LocationId::Process(process_key) => {
999                                    if *to_many {
1000                                        (
1001                                            (
1002                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1003                                                parse_quote!(DUMMY),
1004                                            ),
1005                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1006                                        )
1007                                    } else {
1008                                        let from_node = processes
1009                                            .get(process_key)
1010                                            .unwrap_or_else(|| {
1011                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1012                                            })
1013                                            .clone();
1014
1015                                        let sink_port = from_node.next_port();
1016                                        let source_port = to_node.next_port();
1017
1018                                        if *unpaired {
1019                                            use stageleft::quote_type;
1020                                            use tokio_util::codec::LengthDelimitedCodec;
1021
1022                                            to_node.register(*to_port_id, source_port.clone());
1023
1024                                            let _ = D::e2o_source(
1025                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1026                                                &to_node, &source_port,
1027                                                &from_node, &sink_port,
1028                                                &quote_type::<LengthDelimitedCodec>(),
1029                                                format!("{}_{}", *to_external_key, *to_port_id)
1030                                            );
1031                                        }
1032
1033                                        (
1034                                            (
1035                                                D::o2e_sink(
1036                                                    &from_node,
1037                                                    &sink_port,
1038                                                    &to_node,
1039                                                    &source_port,
1040                                                    format!("{}_{}", *to_external_key, *to_port_id)
1041                                                ),
1042                                                parse_quote!(DUMMY),
1043                                            ),
1044                                            if *unpaired {
1045                                                D::e2o_connect(
1046                                                    &to_node,
1047                                                    &source_port,
1048                                                    &from_node,
1049                                                    &sink_port,
1050                                                    *to_many,
1051                                                    NetworkHint::Auto,
1052                                                )
1053                                            } else {
1054                                                Box::new(|| {}) as Box<dyn FnOnce()>
1055                                            },
1056                                        )
1057                                    }
1058                                }
1059                                LocationId::Cluster(cluster_key) => {
1060                                    let from_node = clusters
1061                                        .get(*cluster_key)
1062                                        .unwrap_or_else(|| {
1063                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1064                                        })
1065                                        .clone();
1066
1067                                    let sink_port = from_node.next_port();
1068                                    let source_port = to_node.next_port();
1069
1070                                    if *unpaired {
1071                                        to_node.register(*to_port_id, source_port.clone());
1072                                    }
1073
1074                                    (
1075                                        (
1076                                            D::m2e_sink(
1077                                                &from_node,
1078                                                &sink_port,
1079                                                &to_node,
1080                                                &source_port,
1081                                                format!("{}_{}", *to_external_key, *to_port_id)
1082                                            ),
1083                                            parse_quote!(DUMMY),
1084                                        ),
1085                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1086                                    )
1087                                }
1088                                _ => panic!()
1089                            }
1090                        },
1091
1092                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1093                    };
1094
1095                    *instantiate_fn = DebugInstantiateFinalized {
1096                        sink: sink_expr,
1097                        source: source_expr,
1098                        connect_fn: Some(connect_fn),
1099                    }
1100                    .into();
1101                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1102                    let element_type = match &input.metadata().collection_kind {
1103                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1104                        _ => panic!("Embedded output must have Stream collection kind"),
1105                    };
1106                    let location_key = match input.metadata().location_id.root() {
1107                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1108                        _ => panic!("Embedded output must be on a process or cluster"),
1109                    };
1110                    D::register_embedded_output(
1111                        &mut refcell_env.borrow_mut(),
1112                        location_key,
1113                        ident,
1114                        &element_type,
1115                    );
1116                }
1117            },
1118            &mut |n| {
1119                if let HydroNode::Network {
1120                    name,
1121                    networking_info,
1122                    input,
1123                    instantiate_fn,
1124                    metadata,
1125                    ..
1126                } = n
1127                {
1128                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1129                        DebugInstantiate::Building => instantiate_network::<D>(
1130                            &mut refcell_env.borrow_mut(),
1131                            input.metadata().location_id.root(),
1132                            metadata.location_id.root(),
1133                            processes,
1134                            clusters,
1135                            name.as_deref(),
1136                            networking_info,
1137                        ),
1138
1139                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1140                    };
1141
1142                    *instantiate_fn = DebugInstantiateFinalized {
1143                        sink: sink_expr,
1144                        source: source_expr,
1145                        connect_fn: Some(connect_fn),
1146                    }
1147                    .into();
1148                } else if let HydroNode::ExternalInput {
1149                    from_external_key,
1150                    from_port_id,
1151                    from_many,
1152                    codec_type,
1153                    port_hint,
1154                    instantiate_fn,
1155                    metadata,
1156                    ..
1157                } = n
1158                {
1159                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1160                        DebugInstantiate::Building => {
1161                            let from_node = externals
1162                                .get(*from_external_key)
1163                                .unwrap_or_else(|| {
1164                                    panic!(
1165                                        "A external used in the graph was not instantiated: {}",
1166                                        from_external_key,
1167                                    )
1168                                })
1169                                .clone();
1170
1171                            match metadata.location_id.root() {
1172                                &LocationId::Process(process_key) => {
1173                                    let to_node = processes
1174                                        .get(process_key)
1175                                        .unwrap_or_else(|| {
1176                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1177                                        })
1178                                        .clone();
1179
1180                                    let sink_port = from_node.next_port();
1181                                    let source_port = to_node.next_port();
1182
1183                                    from_node.register(*from_port_id, sink_port.clone());
1184
1185                                    (
1186                                        (
1187                                            parse_quote!(DUMMY),
1188                                            if *from_many {
1189                                                D::e2o_many_source(
1190                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1191                                                    &to_node, &source_port,
1192                                                    codec_type.0.as_ref(),
1193                                                    format!("{}_{}", *from_external_key, *from_port_id)
1194                                                )
1195                                            } else {
1196                                                D::e2o_source(
1197                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1198                                                    &from_node, &sink_port,
1199                                                    &to_node, &source_port,
1200                                                    codec_type.0.as_ref(),
1201                                                    format!("{}_{}", *from_external_key, *from_port_id)
1202                                                )
1203                                            },
1204                                        ),
1205                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1206                                    )
1207                                }
1208                                LocationId::Cluster(cluster_key) => {
1209                                    let to_node = clusters
1210                                        .get(*cluster_key)
1211                                        .unwrap_or_else(|| {
1212                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1213                                        })
1214                                        .clone();
1215
1216                                    let sink_port = from_node.next_port();
1217                                    let source_port = to_node.next_port();
1218
1219                                    from_node.register(*from_port_id, sink_port.clone());
1220
1221                                    (
1222                                        (
1223                                            parse_quote!(DUMMY),
1224                                            D::e2m_source(
1225                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1226                                                &from_node, &sink_port,
1227                                                &to_node, &source_port,
1228                                                codec_type.0.as_ref(),
1229                                                format!("{}_{}", *from_external_key, *from_port_id)
1230                                            ),
1231                                        ),
1232                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1233                                    )
1234                                }
1235                                _ => panic!()
1236                            }
1237                        },
1238
1239                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1240                    };
1241
1242                    *instantiate_fn = DebugInstantiateFinalized {
1243                        sink: sink_expr,
1244                        source: source_expr,
1245                        connect_fn: Some(connect_fn),
1246                    }
1247                    .into();
1248                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1249                    let element_type = match &metadata.collection_kind {
1250                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1251                        _ => panic!("Embedded source must have Stream collection kind"),
1252                    };
1253                    let location_key = match metadata.location_id.root() {
1254                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1255                        _ => panic!("Embedded source must be on a process or cluster"),
1256                    };
1257                    D::register_embedded_stream_input(
1258                        &mut refcell_env.borrow_mut(),
1259                        location_key,
1260                        ident,
1261                        &element_type,
1262                    );
1263                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1264                    let element_type = match &metadata.collection_kind {
1265                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1266                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1267                    };
1268                    let location_key = match metadata.location_id.root() {
1269                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1270                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1271                    };
1272                    D::register_embedded_singleton_input(
1273                        &mut refcell_env.borrow_mut(),
1274                        location_key,
1275                        ident,
1276                        &element_type,
1277                    );
1278                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1279                    match state {
1280                        ClusterMembersState::Uninit => {
1281                            let at_location = metadata.location_id.root().clone();
1282                            let key = (at_location.clone(), location_id.key());
1283                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1284                                // First occurrence: call cluster_membership_stream and mark as Stream.
1285                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1286                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1287                                    &(),
1288                                );
1289                                *state = ClusterMembersState::Stream(expr.into());
1290                            } else {
1291                                // Already instantiated for this (at, target) pair: just tee.
1292                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1293                            }
1294                        }
1295                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1296                            panic!("cluster members already finalized");
1297                        }
1298                    }
1299                }
1300            },
1301            seen_tees,
1302            false,
1303        );
1304    }
1305
1306    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1307        self.transform_bottom_up(
1308            &mut |l| {
1309                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1310                    match instantiate_fn {
1311                        DebugInstantiate::Building => panic!("network not built"),
1312
1313                        DebugInstantiate::Finalized(finalized) => {
1314                            (finalized.connect_fn.take().unwrap())();
1315                        }
1316                    }
1317                }
1318            },
1319            &mut |n| {
1320                if let HydroNode::Network { instantiate_fn, .. }
1321                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1322                {
1323                    match instantiate_fn {
1324                        DebugInstantiate::Building => panic!("network not built"),
1325
1326                        DebugInstantiate::Finalized(finalized) => {
1327                            (finalized.connect_fn.take().unwrap())();
1328                        }
1329                    }
1330                }
1331            },
1332            seen_tees,
1333            false,
1334        );
1335    }
1336
1337    pub fn transform_bottom_up(
1338        &mut self,
1339        transform_root: &mut impl FnMut(&mut HydroRoot),
1340        transform_node: &mut impl FnMut(&mut HydroNode),
1341        seen_tees: &mut SeenSharedNodes,
1342        check_well_formed: bool,
1343    ) {
1344        self.transform_children(
1345            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1346            seen_tees,
1347        );
1348
1349        transform_root(self);
1350    }
1351
1352    pub fn transform_children(
1353        &mut self,
1354        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1355        seen_tees: &mut SeenSharedNodes,
1356    ) {
1357        match self {
1358            HydroRoot::ForEach { f, input, .. } => {
1359                f.transform_children(&mut transform, seen_tees);
1360                transform(input, seen_tees);
1361            }
1362            HydroRoot::SendExternal { input, .. }
1363            | HydroRoot::DestSink { input, .. }
1364            | HydroRoot::CycleSink { input, .. }
1365            | HydroRoot::EmbeddedOutput { input, .. }
1366            | HydroRoot::Null { input, .. } => {
1367                transform(input, seen_tees);
1368            }
1369        }
1370    }
1371
1372    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1373        match self {
1374            HydroRoot::ForEach {
1375                f,
1376                input,
1377                op_metadata,
1378            } => HydroRoot::ForEach {
1379                f: f.deep_clone(seen_tees),
1380                input: Box::new(input.deep_clone(seen_tees)),
1381                op_metadata: op_metadata.clone(),
1382            },
1383            HydroRoot::SendExternal {
1384                to_external_key,
1385                to_port_id,
1386                to_many,
1387                unpaired,
1388                serialize_fn,
1389                instantiate_fn,
1390                input,
1391                op_metadata,
1392            } => HydroRoot::SendExternal {
1393                to_external_key: *to_external_key,
1394                to_port_id: *to_port_id,
1395                to_many: *to_many,
1396                unpaired: *unpaired,
1397                serialize_fn: serialize_fn.clone(),
1398                instantiate_fn: instantiate_fn.clone(),
1399                input: Box::new(input.deep_clone(seen_tees)),
1400                op_metadata: op_metadata.clone(),
1401            },
1402            HydroRoot::DestSink {
1403                sink,
1404                input,
1405                op_metadata,
1406            } => HydroRoot::DestSink {
1407                sink: sink.clone(),
1408                input: Box::new(input.deep_clone(seen_tees)),
1409                op_metadata: op_metadata.clone(),
1410            },
1411            HydroRoot::CycleSink {
1412                cycle_id,
1413                input,
1414                op_metadata,
1415            } => HydroRoot::CycleSink {
1416                cycle_id: *cycle_id,
1417                input: Box::new(input.deep_clone(seen_tees)),
1418                op_metadata: op_metadata.clone(),
1419            },
1420            HydroRoot::EmbeddedOutput {
1421                ident,
1422                input,
1423                op_metadata,
1424            } => HydroRoot::EmbeddedOutput {
1425                ident: ident.clone(),
1426                input: Box::new(input.deep_clone(seen_tees)),
1427                op_metadata: op_metadata.clone(),
1428            },
1429            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1430                input: Box::new(input.deep_clone(seen_tees)),
1431                op_metadata: op_metadata.clone(),
1432            },
1433        }
1434    }
1435
1436    #[cfg(feature = "build")]
1437    pub fn emit(
1438        &mut self,
1439        graph_builders: &mut dyn DfirBuilder,
1440        seen_tees: &mut SeenSharedNodes,
1441        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1442        next_stmt_id: &mut crate::Counter<StmtId>,
1443        fold_hooked_idents: &mut HashSet<String>,
1444    ) {
1445        self.emit_core(
1446            &mut BuildersOrCallback::<
1447                fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1448                fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1449            >::Builders(graph_builders),
1450            seen_tees,
1451            built_tees,
1452            next_stmt_id,
1453            fold_hooked_idents,
1454        );
1455    }
1456
1457    #[cfg(feature = "build")]
1458    pub fn emit_core(
1459        &mut self,
1460        builders_or_callback: &mut BuildersOrCallback<
1461            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1462            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1463        >,
1464        seen_tees: &mut SeenSharedNodes,
1465        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1466        next_stmt_id: &mut crate::Counter<StmtId>,
1467        fold_hooked_idents: &mut HashSet<String>,
1468    ) {
1469        match self {
1470            HydroRoot::ForEach { f, input, .. } => {
1471                let input_ident = input.emit_core(
1472                    builders_or_callback,
1473                    seen_tees,
1474                    built_tees,
1475                    next_stmt_id,
1476                    fold_hooked_idents,
1477                );
1478
1479                let stmt_id = next_stmt_id.get_and_increment();
1480
1481                match builders_or_callback {
1482                    BuildersOrCallback::Builders(graph_builders) => {
1483                        let mut ident_stack: Vec<syn::Ident> = Vec::new();
1484
1485                        // Look up each captured ref's ident from built_tees
1486                        for (ref_node, _is_mut) in f.singleton_refs.iter() {
1487                            let HydroNode::Reference { inner, .. } = ref_node else {
1488                                panic!("singleton_refs should only contain HydroNode::Reference");
1489                            };
1490                            let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1491                            let idents = built_tees.get(&ptr).expect(
1492                                "ForEach singleton ref not found in built_tees — ref node was not emitted",
1493                            );
1494                            ident_stack.push(idents[0].clone());
1495                        }
1496
1497                        let f_tokens = f.emit_tokens(&mut ident_stack);
1498
1499                        graph_builders
1500                            .get_dfir_mut(&input.metadata().location_id)
1501                            .add_dfir(
1502                                parse_quote! {
1503                                    #input_ident -> for_each(#f_tokens);
1504                                },
1505                                None,
1506                                Some(&stmt_id.to_string()),
1507                            );
1508                    }
1509                    BuildersOrCallback::Callback(leaf_callback, _) => {
1510                        leaf_callback(self, next_stmt_id);
1511                    }
1512                }
1513            }
1514
1515            HydroRoot::SendExternal {
1516                serialize_fn,
1517                instantiate_fn,
1518                input,
1519                ..
1520            } => {
1521                let input_ident = input.emit_core(
1522                    builders_or_callback,
1523                    seen_tees,
1524                    built_tees,
1525                    next_stmt_id,
1526                    fold_hooked_idents,
1527                );
1528
1529                let stmt_id = next_stmt_id.get_and_increment();
1530
1531                match builders_or_callback {
1532                    BuildersOrCallback::Builders(graph_builders) => {
1533                        let (sink_expr, _) = match instantiate_fn {
1534                            DebugInstantiate::Building => (
1535                                syn::parse_quote!(DUMMY_SINK),
1536                                syn::parse_quote!(DUMMY_SOURCE),
1537                            ),
1538
1539                            DebugInstantiate::Finalized(finalized) => {
1540                                (finalized.sink.clone(), finalized.source.clone())
1541                            }
1542                        };
1543
1544                        graph_builders.create_external_output(
1545                            &input.metadata().location_id,
1546                            sink_expr,
1547                            &input_ident,
1548                            serialize_fn.as_ref(),
1549                            stmt_id,
1550                        );
1551                    }
1552                    BuildersOrCallback::Callback(leaf_callback, _) => {
1553                        leaf_callback(self, next_stmt_id);
1554                    }
1555                }
1556            }
1557
1558            HydroRoot::DestSink { sink, input, .. } => {
1559                let input_ident = input.emit_core(
1560                    builders_or_callback,
1561                    seen_tees,
1562                    built_tees,
1563                    next_stmt_id,
1564                    fold_hooked_idents,
1565                );
1566
1567                let stmt_id = next_stmt_id.get_and_increment();
1568
1569                match builders_or_callback {
1570                    BuildersOrCallback::Builders(graph_builders) => {
1571                        graph_builders
1572                            .get_dfir_mut(&input.metadata().location_id)
1573                            .add_dfir(
1574                                parse_quote! {
1575                                    #input_ident -> dest_sink(#sink);
1576                                },
1577                                None,
1578                                Some(&stmt_id.to_string()),
1579                            );
1580                    }
1581                    BuildersOrCallback::Callback(leaf_callback, _) => {
1582                        leaf_callback(self, next_stmt_id);
1583                    }
1584                }
1585            }
1586
1587            HydroRoot::CycleSink {
1588                cycle_id, input, ..
1589            } => {
1590                let input_ident = input.emit_core(
1591                    builders_or_callback,
1592                    seen_tees,
1593                    built_tees,
1594                    next_stmt_id,
1595                    fold_hooked_idents,
1596                );
1597
1598                match builders_or_callback {
1599                    BuildersOrCallback::Builders(graph_builders) => {
1600                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1601                            CollectionKind::KeyedSingleton {
1602                                key_type,
1603                                value_type,
1604                                ..
1605                            }
1606                            | CollectionKind::KeyedStream {
1607                                key_type,
1608                                value_type,
1609                                ..
1610                            } => {
1611                                parse_quote!((#key_type, #value_type))
1612                            }
1613                            CollectionKind::Stream { element_type, .. }
1614                            | CollectionKind::Singleton { element_type, .. }
1615                            | CollectionKind::Optional { element_type, .. } => {
1616                                parse_quote!(#element_type)
1617                            }
1618                        };
1619
1620                        let cycle_id_ident = cycle_id.as_ident();
1621                        graph_builders
1622                            .get_dfir_mut(&input.metadata().location_id)
1623                            .add_dfir(
1624                                parse_quote! {
1625                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1626                                },
1627                                None,
1628                                None,
1629                            );
1630                    }
1631                    // No ID, no callback
1632                    BuildersOrCallback::Callback(_, _) => {}
1633                }
1634            }
1635
1636            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1637                let input_ident = input.emit_core(
1638                    builders_or_callback,
1639                    seen_tees,
1640                    built_tees,
1641                    next_stmt_id,
1642                    fold_hooked_idents,
1643                );
1644
1645                let stmt_id = next_stmt_id.get_and_increment();
1646
1647                match builders_or_callback {
1648                    BuildersOrCallback::Builders(graph_builders) => {
1649                        graph_builders
1650                            .get_dfir_mut(&input.metadata().location_id)
1651                            .add_dfir(
1652                                parse_quote! {
1653                                    #input_ident -> for_each(&mut #ident);
1654                                },
1655                                None,
1656                                Some(&stmt_id.to_string()),
1657                            );
1658                    }
1659                    BuildersOrCallback::Callback(leaf_callback, _) => {
1660                        leaf_callback(self, next_stmt_id);
1661                    }
1662                }
1663            }
1664
1665            HydroRoot::Null { input, .. } => {
1666                let input_ident = input.emit_core(
1667                    builders_or_callback,
1668                    seen_tees,
1669                    built_tees,
1670                    next_stmt_id,
1671                    fold_hooked_idents,
1672                );
1673
1674                let stmt_id = next_stmt_id.get_and_increment();
1675
1676                match builders_or_callback {
1677                    BuildersOrCallback::Builders(graph_builders) => {
1678                        graph_builders
1679                            .get_dfir_mut(&input.metadata().location_id)
1680                            .add_dfir(
1681                                parse_quote! {
1682                                    #input_ident -> for_each(|_| {});
1683                                },
1684                                None,
1685                                Some(&stmt_id.to_string()),
1686                            );
1687                    }
1688                    BuildersOrCallback::Callback(leaf_callback, _) => {
1689                        leaf_callback(self, next_stmt_id);
1690                    }
1691                }
1692            }
1693        }
1694    }
1695
1696    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1697        match self {
1698            HydroRoot::ForEach { op_metadata, .. }
1699            | HydroRoot::SendExternal { op_metadata, .. }
1700            | HydroRoot::DestSink { op_metadata, .. }
1701            | HydroRoot::CycleSink { op_metadata, .. }
1702            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1703            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1704        }
1705    }
1706
1707    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1708        match self {
1709            HydroRoot::ForEach { op_metadata, .. }
1710            | HydroRoot::SendExternal { op_metadata, .. }
1711            | HydroRoot::DestSink { op_metadata, .. }
1712            | HydroRoot::CycleSink { op_metadata, .. }
1713            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1714            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1715        }
1716    }
1717
1718    pub fn input(&self) -> &HydroNode {
1719        match self {
1720            HydroRoot::ForEach { input, .. }
1721            | HydroRoot::SendExternal { input, .. }
1722            | HydroRoot::DestSink { input, .. }
1723            | HydroRoot::CycleSink { input, .. }
1724            | HydroRoot::EmbeddedOutput { input, .. }
1725            | HydroRoot::Null { input, .. } => input,
1726        }
1727    }
1728
1729    pub fn input_metadata(&self) -> &HydroIrMetadata {
1730        self.input().metadata()
1731    }
1732
1733    pub fn print_root(&self) -> String {
1734        match self {
1735            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1736            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1737            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1738            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1739            HydroRoot::EmbeddedOutput { ident, .. } => {
1740                format!("EmbeddedOutput({})", ident)
1741            }
1742            HydroRoot::Null { .. } => "Null".to_owned(),
1743        }
1744    }
1745
1746    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1747        match self {
1748            HydroRoot::ForEach { f, .. } => {
1749                transform(&mut f.expr);
1750            }
1751            HydroRoot::DestSink { sink, .. } => {
1752                transform(sink);
1753            }
1754            HydroRoot::SendExternal { .. }
1755            | HydroRoot::CycleSink { .. }
1756            | HydroRoot::EmbeddedOutput { .. }
1757            | HydroRoot::Null { .. } => {}
1758        }
1759    }
1760}
1761
1762#[cfg(feature = "build")]
1763fn tick_of(loc: &LocationId) -> Option<ClockId> {
1764    match loc {
1765        LocationId::Tick(id, _) => Some(*id),
1766        LocationId::Atomic(inner) => tick_of(inner),
1767        _ => None,
1768    }
1769}
1770
1771#[cfg(feature = "build")]
1772fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1773    match loc {
1774        LocationId::Tick(id, inner) => {
1775            *id = uf_find(uf, *id);
1776            remap_location(inner, uf);
1777        }
1778        LocationId::Atomic(inner) => {
1779            remap_location(inner, uf);
1780        }
1781        LocationId::Process(_) | LocationId::Cluster(_) => {}
1782    }
1783}
1784
1785#[cfg(feature = "build")]
1786fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1787    let p = *parent.get(&x).unwrap_or(&x);
1788    if p == x {
1789        return x;
1790    }
1791    let root = uf_find(parent, p);
1792    parent.insert(x, root);
1793    root
1794}
1795
1796#[cfg(feature = "build")]
1797fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1798    let ra = uf_find(parent, a);
1799    let rb = uf_find(parent, b);
1800    if ra != rb {
1801        parent.insert(ra, rb);
1802    }
1803}
1804
1805/// Traverse the IR to build a union-find that unifies tick IDs connected
1806/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1807/// rewrite all `LocationId`s to use the representative tick ID.
1808#[cfg(feature = "build")]
1809pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1810    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1811
1812    // Pass 1: collect unifications.
1813    transform_bottom_up(
1814        ir,
1815        &mut |_| {},
1816        &mut |node: &mut HydroNode| match node {
1817            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1818                if let (Some(a), Some(b)) = (
1819                    tick_of(&inner.metadata().location_id),
1820                    tick_of(&metadata.location_id),
1821                ) {
1822                    uf_union(&mut uf, a, b);
1823                }
1824            }
1825            HydroNode::Chain {
1826                first,
1827                second,
1828                metadata,
1829            }
1830            | HydroNode::ChainFirst {
1831                first,
1832                second,
1833                metadata,
1834            }
1835            | HydroNode::MergeOrdered {
1836                first,
1837                second,
1838                metadata,
1839            } => {
1840                if let (Some(a), Some(b)) = (
1841                    tick_of(&first.metadata().location_id),
1842                    tick_of(&metadata.location_id),
1843                ) {
1844                    uf_union(&mut uf, a, b);
1845                }
1846                if let (Some(a), Some(b)) = (
1847                    tick_of(&second.metadata().location_id),
1848                    tick_of(&metadata.location_id),
1849                ) {
1850                    uf_union(&mut uf, a, b);
1851                }
1852            }
1853            _ => {}
1854        },
1855        false,
1856    );
1857
1858    // Pass 2: rewrite all LocationIds.
1859    transform_bottom_up(
1860        ir,
1861        &mut |_| {},
1862        &mut |node: &mut HydroNode| {
1863            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1864        },
1865        false,
1866    );
1867}
1868
1869#[cfg(feature = "build")]
1870pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1871    let mut builders = SecondaryMap::new();
1872    let mut seen_tees = HashMap::new();
1873    let mut built_tees = HashMap::new();
1874    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1875    let mut fold_hooked_idents = HashSet::new();
1876    for leaf in ir {
1877        leaf.emit(
1878            &mut builders,
1879            &mut seen_tees,
1880            &mut built_tees,
1881            &mut next_stmt_id,
1882            &mut fold_hooked_idents,
1883        );
1884    }
1885    builders
1886}
1887
1888#[cfg(feature = "build")]
1889pub fn traverse_dfir(
1890    ir: &mut [HydroRoot],
1891    transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1892    transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1893) {
1894    let mut seen_tees = HashMap::new();
1895    let mut built_tees = HashMap::new();
1896    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1897    let mut fold_hooked_idents = HashSet::new();
1898    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1899    ir.iter_mut().for_each(|leaf| {
1900        leaf.emit_core(
1901            &mut callback,
1902            &mut seen_tees,
1903            &mut built_tees,
1904            &mut next_stmt_id,
1905            &mut fold_hooked_idents,
1906        );
1907    });
1908}
1909
1910pub fn transform_bottom_up(
1911    ir: &mut [HydroRoot],
1912    transform_root: &mut impl FnMut(&mut HydroRoot),
1913    transform_node: &mut impl FnMut(&mut HydroNode),
1914    check_well_formed: bool,
1915) {
1916    let mut seen_tees = HashMap::new();
1917    ir.iter_mut().for_each(|leaf| {
1918        leaf.transform_bottom_up(
1919            transform_root,
1920            transform_node,
1921            &mut seen_tees,
1922            check_well_formed,
1923        );
1924    });
1925}
1926
1927pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1928    let mut seen_tees = HashMap::new();
1929    ir.iter()
1930        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1931        .collect()
1932}
1933
1934type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1935thread_local! {
1936    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1937    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1938    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1939    /// on subsequent encounters, preventing infinite loops.
1940    static SERIALIZED_SHARED: PrintedTees
1941        = const { RefCell::new(None) };
1942}
1943
1944pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1945    PRINTED_TEES.with(|printed_tees| {
1946        let mut printed_tees_mut = printed_tees.borrow_mut();
1947        *printed_tees_mut = Some((0, HashMap::new()));
1948        drop(printed_tees_mut);
1949
1950        let ret = f();
1951
1952        let mut printed_tees_mut = printed_tees.borrow_mut();
1953        *printed_tees_mut = None;
1954
1955        ret
1956    })
1957}
1958
1959/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1960/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1961/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1962/// back-reference.  The tracking state is restored when `f` returns or panics.
1963pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1964    let _guard = SerializedSharedGuard::enter();
1965    f()
1966}
1967
1968/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1969/// making `serialize_dedup_shared` re-entrant and panic-safe.
1970struct SerializedSharedGuard {
1971    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1972}
1973
1974impl SerializedSharedGuard {
1975    fn enter() -> Self {
1976        let previous = SERIALIZED_SHARED.with(|cell| {
1977            let mut guard = cell.borrow_mut();
1978            guard.replace((0, HashMap::new()))
1979        });
1980        Self { previous }
1981    }
1982}
1983
1984impl Drop for SerializedSharedGuard {
1985    fn drop(&mut self) {
1986        SERIALIZED_SHARED.with(|cell| {
1987            *cell.borrow_mut() = self.previous.take();
1988        });
1989    }
1990}
1991
1992pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1993
1994impl serde::Serialize for SharedNode {
1995    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
1996    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
1997    /// same subtree every time and, if the graph ever contains a cycle, loop
1998    /// forever.
1999    ///
2000    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2001    /// integer id.  The first time we see a pointer we assign it the next id and
2002    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2003    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2004    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2005    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2006        SERIALIZED_SHARED.with(|cell| {
2007            let mut guard = cell.borrow_mut();
2008            // (next_id, pointer → assigned_id)
2009            let state = guard.as_mut().ok_or_else(|| {
2010                serde::ser::Error::custom(
2011                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2012                )
2013            })?;
2014            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2015
2016            if let Some(&id) = state.1.get(&ptr) {
2017                drop(guard);
2018                use serde::ser::SerializeMap;
2019                let mut map = serializer.serialize_map(Some(1))?;
2020                map.serialize_entry("$shared_ref", &id)?;
2021                map.end()
2022            } else {
2023                let id = state.0;
2024                state.0 += 1;
2025                state.1.insert(ptr, id);
2026                drop(guard);
2027
2028                use serde::ser::SerializeMap;
2029                let mut map = serializer.serialize_map(Some(2))?;
2030                map.serialize_entry("$shared", &id)?;
2031                map.serialize_entry("node", &*self.0.borrow())?;
2032                map.end()
2033            }
2034        })
2035    }
2036}
2037
2038impl SharedNode {
2039    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2040        Rc::as_ptr(&self.0)
2041    }
2042}
2043
2044impl Debug for SharedNode {
2045    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2046        PRINTED_TEES.with(|printed_tees| {
2047            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2048            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2049
2050            if let Some(printed_tees_mut) = printed_tees_mut {
2051                if let Some(existing) = printed_tees_mut
2052                    .1
2053                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2054                {
2055                    write!(f, "<shared {}>", existing)
2056                } else {
2057                    let next_id = printed_tees_mut.0;
2058                    printed_tees_mut.0 += 1;
2059                    printed_tees_mut
2060                        .1
2061                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2062                    drop(printed_tees_mut_borrow);
2063                    write!(f, "<shared {}>: ", next_id)?;
2064                    Debug::fmt(&self.0.borrow(), f)
2065                }
2066            } else {
2067                drop(printed_tees_mut_borrow);
2068                write!(f, "<shared>: ")?;
2069                Debug::fmt(&self.0.borrow(), f)
2070            }
2071        })
2072    }
2073}
2074
2075impl Hash for SharedNode {
2076    fn hash<H: Hasher>(&self, state: &mut H) {
2077        self.0.borrow_mut().hash(state);
2078    }
2079}
2080
2081/// A counter for tracking singleton access groups on a `HydroNode::Reference`.
2082///
2083/// Each mutable access increments the counter (before and after) to isolate itself in its own group;
2084/// immutable accesses share the current group.
2085#[derive(Debug)]
2086pub enum AccessCounter {
2087    Counting(Cell<u32>),
2088    Frozen(u32),
2089}
2090
2091impl AccessCounter {
2092    pub fn new() -> Self {
2093        Self::Counting(Cell::new(0))
2094    }
2095
2096    /// Assign the next access group for this reference.
2097    /// Mutable accesses get an isolated group (counter increments before and after).
2098    /// Immutable accesses share the current group.
2099    pub fn next_group(&self, is_mut: bool) -> Self {
2100        let AccessCounter::Counting(count) = self else {
2101            panic!("Cannot count on `AccessCounter::Frozen`");
2102        };
2103        let c = if is_mut {
2104            let c = count.get() + 1;
2105            count.set(c + 1);
2106            c
2107        } else {
2108            count.get()
2109        };
2110        Self::Frozen(c)
2111    }
2112
2113    /// Creates a frozen counter to prevent further counting.
2114    pub fn freeze(&self) -> Self {
2115        Self::Frozen(match self {
2116            Self::Counting(count) => count.get(),
2117            Self::Frozen(count) => *count,
2118        })
2119    }
2120
2121    pub fn frozen_group(&self) -> u32 {
2122        let Self::Frozen(count) = self else {
2123            panic!("`AccessCounter` not frozen");
2124        };
2125        *count
2126    }
2127}
2128
2129impl Default for AccessCounter {
2130    fn default() -> Self {
2131        Self::new()
2132    }
2133}
2134
2135impl Hash for AccessCounter {
2136    fn hash<H: Hasher>(&self, _state: &mut H) {
2137        // Access counter does not participate in hashing — it is runtime bookkeeping.
2138    }
2139}
2140
2141impl serde::Serialize for AccessCounter {
2142    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2143        let count = match self {
2144            AccessCounter::Counting(count) => count.get(),
2145            AccessCounter::Frozen(count) => *count,
2146        };
2147        count.serialize(serializer)
2148    }
2149}
2150
2151#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2152pub enum BoundKind {
2153    Unbounded,
2154    Bounded,
2155}
2156
2157#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2158pub enum StreamOrder {
2159    NoOrder,
2160    TotalOrder,
2161}
2162
2163#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2164pub enum StreamRetry {
2165    AtLeastOnce,
2166    ExactlyOnce,
2167}
2168
2169#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2170pub enum KeyedSingletonBoundKind {
2171    Unbounded,
2172    MonotonicKeys,
2173    MonotonicValue,
2174    BoundedValue,
2175    Bounded,
2176}
2177
2178#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2179pub enum SingletonBoundKind {
2180    Unbounded,
2181    Monotonic,
2182    Bounded,
2183}
2184
2185#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2186pub enum CollectionKind {
2187    Stream {
2188        bound: BoundKind,
2189        order: StreamOrder,
2190        retry: StreamRetry,
2191        element_type: DebugType,
2192    },
2193    Singleton {
2194        bound: SingletonBoundKind,
2195        element_type: DebugType,
2196    },
2197    Optional {
2198        bound: BoundKind,
2199        element_type: DebugType,
2200    },
2201    KeyedStream {
2202        bound: BoundKind,
2203        value_order: StreamOrder,
2204        value_retry: StreamRetry,
2205        key_type: DebugType,
2206        value_type: DebugType,
2207    },
2208    KeyedSingleton {
2209        bound: KeyedSingletonBoundKind,
2210        key_type: DebugType,
2211        value_type: DebugType,
2212    },
2213}
2214
2215impl CollectionKind {
2216    pub fn is_bounded(&self) -> bool {
2217        matches!(
2218            self,
2219            CollectionKind::Stream {
2220                bound: BoundKind::Bounded,
2221                ..
2222            } | CollectionKind::Singleton {
2223                bound: SingletonBoundKind::Bounded,
2224                ..
2225            } | CollectionKind::Optional {
2226                bound: BoundKind::Bounded,
2227                ..
2228            } | CollectionKind::KeyedStream {
2229                bound: BoundKind::Bounded,
2230                ..
2231            } | CollectionKind::KeyedSingleton {
2232                bound: KeyedSingletonBoundKind::Bounded,
2233                ..
2234            }
2235        )
2236    }
2237
2238    /// Returns whether this collection kind is already "strict" (TotalOrder + ExactlyOnce),
2239    /// meaning no non-determinism needs to be observed for mut closures.
2240    pub fn is_strict(&self) -> bool {
2241        match self {
2242            CollectionKind::Stream { order, retry, .. } => {
2243                *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2244            }
2245            CollectionKind::KeyedStream {
2246                value_order,
2247                value_retry,
2248                ..
2249            } => {
2250                *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2251            }
2252            // Singletons/Optionals/KeyedSingletons do not have observable
2253            // non-determinism other than snapshots / batching
2254            CollectionKind::Singleton { .. }
2255            | CollectionKind::Optional { .. }
2256            | CollectionKind::KeyedSingleton { .. } => true,
2257        }
2258    }
2259
2260    /// Creates a "strict" version of this kind with TotalOrder and ExactlyOnce.
2261    pub fn strict_kind(&self) -> CollectionKind {
2262        match self {
2263            CollectionKind::Stream {
2264                bound,
2265                element_type,
2266                ..
2267            } => CollectionKind::Stream {
2268                bound: bound.clone(),
2269                order: StreamOrder::TotalOrder,
2270                retry: StreamRetry::ExactlyOnce,
2271                element_type: element_type.clone(),
2272            },
2273            CollectionKind::KeyedStream {
2274                bound,
2275                key_type,
2276                value_type,
2277                ..
2278            } => CollectionKind::KeyedStream {
2279                bound: bound.clone(),
2280                value_order: StreamOrder::TotalOrder,
2281                value_retry: StreamRetry::ExactlyOnce,
2282                key_type: key_type.clone(),
2283                value_type: value_type.clone(),
2284            },
2285            other => other.clone(),
2286        }
2287    }
2288}
2289
2290#[derive(Clone, serde::Serialize)]
2291pub struct HydroIrMetadata {
2292    pub location_id: LocationId,
2293    pub collection_kind: CollectionKind,
2294    pub consistency: Option<ClusterConsistency>,
2295    pub cardinality: Option<usize>,
2296    pub tag: Option<String>,
2297    pub op: HydroIrOpMetadata,
2298}
2299
2300// HydroIrMetadata shouldn't be used to hash or compare
2301impl Hash for HydroIrMetadata {
2302    fn hash<H: Hasher>(&self, _: &mut H) {}
2303}
2304
2305impl PartialEq for HydroIrMetadata {
2306    fn eq(&self, _: &Self) -> bool {
2307        true
2308    }
2309}
2310
2311impl Eq for HydroIrMetadata {}
2312
2313impl Debug for HydroIrMetadata {
2314    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2315        f.debug_struct("HydroIrMetadata")
2316            .field("location_id", &self.location_id)
2317            .field("collection_kind", &self.collection_kind)
2318            .finish()
2319    }
2320}
2321
2322/// Metadata that is specific to the operator itself, rather than its outputs.
2323/// This is available on _both_ inner nodes and roots.
2324#[derive(Clone, serde::Serialize)]
2325pub struct HydroIrOpMetadata {
2326    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2327    pub backtrace: Backtrace,
2328    pub cpu_usage: Option<f64>,
2329    pub network_recv_cpu_usage: Option<f64>,
2330    pub id: Option<usize>,
2331}
2332
2333impl HydroIrOpMetadata {
2334    #[expect(
2335        clippy::new_without_default,
2336        reason = "explicit calls to new ensure correct backtrace bounds"
2337    )]
2338    pub fn new() -> HydroIrOpMetadata {
2339        Self::new_with_skip(1)
2340    }
2341
2342    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2343        HydroIrOpMetadata {
2344            backtrace: Backtrace::get_backtrace(2 + skip_count),
2345            cpu_usage: None,
2346            network_recv_cpu_usage: None,
2347            id: None,
2348        }
2349    }
2350}
2351
2352impl Debug for HydroIrOpMetadata {
2353    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2354        f.debug_struct("HydroIrOpMetadata").finish()
2355    }
2356}
2357
2358impl Hash for HydroIrOpMetadata {
2359    fn hash<H: Hasher>(&self, _: &mut H) {}
2360}
2361
2362/// An intermediate node in a Hydro graph, which consumes data
2363/// from upstream nodes and emits data to downstream nodes.
2364#[derive(Debug, Hash, serde::Serialize)]
2365pub enum HydroNode {
2366    Placeholder,
2367
2368    /// Manually "casts" between two different collection kinds.
2369    ///
2370    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2371    /// correctness checks. In particular, the user must ensure that every possible
2372    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2373    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2374    /// collection. This ensures that the simulator does not miss any possible outputs.
2375    Cast {
2376        inner: Box<HydroNode>,
2377        metadata: HydroIrMetadata,
2378    },
2379
2380    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2381    /// interpretation of the input stream.
2382    ///
2383    /// In production, this simply passes through the input, but in simulation, this operator
2384    /// explicitly selects a randomized interpretation.
2385    ObserveNonDet {
2386        inner: Box<HydroNode>,
2387        trusted: bool, // if true, we do not need to simulate non-determinism
2388        metadata: HydroIrMetadata,
2389    },
2390
2391    Source {
2392        source: HydroSource,
2393        metadata: HydroIrMetadata,
2394    },
2395
2396    SingletonSource {
2397        value: DebugExpr,
2398        first_tick_only: bool,
2399        metadata: HydroIrMetadata,
2400    },
2401
2402    CycleSource {
2403        cycle_id: CycleId,
2404        metadata: HydroIrMetadata,
2405    },
2406
2407    Tee {
2408        inner: SharedNode,
2409        metadata: HydroIrMetadata,
2410    },
2411
2412    /// A reference materialization point. Wraps a SharedNode so that:
2413    /// - The pipe output delivers data to one consumer
2414    /// - `#var` references can borrow the value from the slot
2415    ///
2416    /// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or
2417    /// `-> handoff()` depending on `kind`.
2418    ///
2419    /// Uses the same `built_tees` dedup pattern as `Tee`.
2420    Reference {
2421        inner: SharedNode,
2422        kind: crate::handoff_ref::HandoffRefKind,
2423        access_counter: AccessCounter,
2424        metadata: HydroIrMetadata,
2425    },
2426
2427    Partition {
2428        inner: SharedNode,
2429        f: ClosureExpr,
2430        is_true: bool,
2431        metadata: HydroIrMetadata,
2432    },
2433
2434    BeginAtomic {
2435        inner: Box<HydroNode>,
2436        metadata: HydroIrMetadata,
2437    },
2438
2439    EndAtomic {
2440        inner: Box<HydroNode>,
2441        metadata: HydroIrMetadata,
2442    },
2443
2444    Batch {
2445        inner: Box<HydroNode>,
2446        metadata: HydroIrMetadata,
2447    },
2448
2449    YieldConcat {
2450        inner: Box<HydroNode>,
2451        metadata: HydroIrMetadata,
2452    },
2453
2454    Chain {
2455        first: Box<HydroNode>,
2456        second: Box<HydroNode>,
2457        metadata: HydroIrMetadata,
2458    },
2459
2460    MergeOrdered {
2461        first: Box<HydroNode>,
2462        second: Box<HydroNode>,
2463        metadata: HydroIrMetadata,
2464    },
2465
2466    ChainFirst {
2467        first: Box<HydroNode>,
2468        second: Box<HydroNode>,
2469        metadata: HydroIrMetadata,
2470    },
2471
2472    CrossProduct {
2473        left: Box<HydroNode>,
2474        right: Box<HydroNode>,
2475        metadata: HydroIrMetadata,
2476    },
2477
2478    CrossSingleton {
2479        left: Box<HydroNode>,
2480        right: Box<HydroNode>,
2481        metadata: HydroIrMetadata,
2482    },
2483
2484    Join {
2485        left: Box<HydroNode>,
2486        right: Box<HydroNode>,
2487        metadata: HydroIrMetadata,
2488    },
2489
2490    /// Asymmetric join where the right (build) side is bounded.
2491    /// The build side is accumulated (stratum-delayed) into a hash table,
2492    /// then the left (probe) side streams through preserving its ordering.
2493    JoinHalf {
2494        left: Box<HydroNode>,
2495        right: Box<HydroNode>,
2496        metadata: HydroIrMetadata,
2497    },
2498
2499    Difference {
2500        pos: Box<HydroNode>,
2501        neg: Box<HydroNode>,
2502        metadata: HydroIrMetadata,
2503    },
2504
2505    AntiJoin {
2506        pos: Box<HydroNode>,
2507        neg: Box<HydroNode>,
2508        metadata: HydroIrMetadata,
2509    },
2510
2511    ResolveFutures {
2512        input: Box<HydroNode>,
2513        metadata: HydroIrMetadata,
2514    },
2515    ResolveFuturesBlocking {
2516        input: Box<HydroNode>,
2517        metadata: HydroIrMetadata,
2518    },
2519    ResolveFuturesOrdered {
2520        input: Box<HydroNode>,
2521        metadata: HydroIrMetadata,
2522    },
2523
2524    Map {
2525        f: ClosureExpr,
2526        input: Box<HydroNode>,
2527        metadata: HydroIrMetadata,
2528    },
2529    FlatMap {
2530        f: ClosureExpr,
2531        input: Box<HydroNode>,
2532        metadata: HydroIrMetadata,
2533    },
2534    FlatMapStreamBlocking {
2535        f: ClosureExpr,
2536        input: Box<HydroNode>,
2537        metadata: HydroIrMetadata,
2538    },
2539    Filter {
2540        f: ClosureExpr,
2541        input: Box<HydroNode>,
2542        metadata: HydroIrMetadata,
2543    },
2544    FilterMap {
2545        f: ClosureExpr,
2546        input: Box<HydroNode>,
2547        metadata: HydroIrMetadata,
2548    },
2549
2550    DeferTick {
2551        input: Box<HydroNode>,
2552        metadata: HydroIrMetadata,
2553    },
2554    Enumerate {
2555        input: Box<HydroNode>,
2556        metadata: HydroIrMetadata,
2557    },
2558    Inspect {
2559        f: ClosureExpr,
2560        input: Box<HydroNode>,
2561        metadata: HydroIrMetadata,
2562    },
2563
2564    Unique {
2565        input: Box<HydroNode>,
2566        metadata: HydroIrMetadata,
2567    },
2568
2569    Sort {
2570        input: Box<HydroNode>,
2571        metadata: HydroIrMetadata,
2572    },
2573    Fold {
2574        init: ClosureExpr,
2575        acc: ClosureExpr,
2576        input: Box<HydroNode>,
2577        metadata: HydroIrMetadata,
2578    },
2579
2580    Scan {
2581        init: ClosureExpr,
2582        acc: ClosureExpr,
2583        input: Box<HydroNode>,
2584        metadata: HydroIrMetadata,
2585    },
2586    ScanAsyncBlocking {
2587        init: ClosureExpr,
2588        acc: ClosureExpr,
2589        input: Box<HydroNode>,
2590        metadata: HydroIrMetadata,
2591    },
2592    FoldKeyed {
2593        init: ClosureExpr,
2594        acc: ClosureExpr,
2595        input: Box<HydroNode>,
2596        metadata: HydroIrMetadata,
2597    },
2598
2599    Reduce {
2600        f: ClosureExpr,
2601        input: Box<HydroNode>,
2602        metadata: HydroIrMetadata,
2603    },
2604    ReduceKeyed {
2605        f: ClosureExpr,
2606        input: Box<HydroNode>,
2607        metadata: HydroIrMetadata,
2608    },
2609    ReduceKeyedWatermark {
2610        f: ClosureExpr,
2611        input: Box<HydroNode>,
2612        watermark: Box<HydroNode>,
2613        metadata: HydroIrMetadata,
2614    },
2615
2616    Network {
2617        name: Option<String>,
2618        networking_info: crate::networking::NetworkingInfo,
2619        serialize_fn: Option<DebugExpr>,
2620        instantiate_fn: DebugInstantiate,
2621        deserialize_fn: Option<DebugExpr>,
2622        input: Box<HydroNode>,
2623        metadata: HydroIrMetadata,
2624    },
2625
2626    ExternalInput {
2627        from_external_key: LocationKey,
2628        from_port_id: ExternalPortId,
2629        from_many: bool,
2630        codec_type: DebugType,
2631        #[serde(skip)]
2632        port_hint: NetworkHint,
2633        instantiate_fn: DebugInstantiate,
2634        deserialize_fn: Option<DebugExpr>,
2635        metadata: HydroIrMetadata,
2636    },
2637
2638    Counter {
2639        tag: String,
2640        duration: DebugExpr,
2641        prefix: String,
2642        input: Box<HydroNode>,
2643        metadata: HydroIrMetadata,
2644    },
2645
2646    AssertIsConsistent {
2647        inner: Box<HydroNode>,
2648        trusted: bool,
2649        metadata: HydroIrMetadata,
2650    },
2651
2652    UnboundSingleton {
2653        inner: Box<HydroNode>,
2654        metadata: HydroIrMetadata,
2655    },
2656}
2657
2658pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2659pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2660
2661/// If `f` has a mut singleton ref and `in_kind` is non-strict, emits an
2662/// `observe_for_mut` node and returns the new ident. Otherwise returns
2663/// `in_ident` unchanged. Always consumes a stmt_id when applicable.
2664#[cfg(feature = "build")]
2665fn maybe_observe_for_mut(
2666    f: &ClosureExpr,
2667    in_ident: syn::Ident,
2668    in_location: &LocationId,
2669    in_kind: &CollectionKind,
2670    op_meta: &HydroIrOpMetadata,
2671    builders_or_callback: &mut BuildersOrCallback<
2672        impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2673        impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2674    >,
2675    next_stmt_id: &mut crate::Counter<StmtId>,
2676) -> syn::Ident {
2677    if f.has_mut_ref() && !in_kind.is_strict() {
2678        let observe_stmt_id = next_stmt_id.get_and_increment();
2679        let observe_ident =
2680            syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2681        if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2682            graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2683        }
2684        observe_ident
2685    } else {
2686        in_ident
2687    }
2688}
2689
2690impl HydroNode {
2691    pub fn transform_bottom_up(
2692        &mut self,
2693        transform: &mut impl FnMut(&mut HydroNode),
2694        seen_tees: &mut SeenSharedNodes,
2695        check_well_formed: bool,
2696    ) {
2697        self.transform_children(
2698            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2699            seen_tees,
2700        );
2701
2702        transform(self);
2703
2704        let self_location = self.metadata().location_id.root();
2705
2706        if check_well_formed {
2707            match &*self {
2708                HydroNode::Network { .. } => {}
2709                _ => {
2710                    self.input_metadata().iter().for_each(|i| {
2711                        if i.location_id.root() != self_location {
2712                            panic!(
2713                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2714                                i,
2715                                i.location_id.root(),
2716                                self,
2717                                self_location
2718                            )
2719                        }
2720                    });
2721                }
2722            }
2723        }
2724    }
2725
2726    #[inline(always)]
2727    pub fn transform_children(
2728        &mut self,
2729        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2730        seen_tees: &mut SeenSharedNodes,
2731    ) {
2732        match self {
2733            HydroNode::Placeholder => {
2734                panic!();
2735            }
2736
2737            HydroNode::Source { .. }
2738            | HydroNode::SingletonSource { .. }
2739            | HydroNode::CycleSource { .. }
2740            | HydroNode::ExternalInput { .. } => {}
2741
2742            HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2743                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2744                    *inner = SharedNode(transformed.clone());
2745                } else {
2746                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2747                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2748                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2749                    transform(&mut orig, seen_tees);
2750                    *transformed_cell.borrow_mut() = orig;
2751                    *inner = SharedNode(transformed_cell);
2752                }
2753            }
2754
2755            HydroNode::Partition { inner, f, .. } => {
2756                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2757                    *inner = SharedNode(transformed.clone());
2758                } else {
2759                    f.transform_children(&mut transform, seen_tees);
2760                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2761                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2762                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2763                    transform(&mut orig, seen_tees);
2764                    *transformed_cell.borrow_mut() = orig;
2765                    *inner = SharedNode(transformed_cell);
2766                }
2767            }
2768
2769            HydroNode::Cast { inner, .. }
2770            | HydroNode::ObserveNonDet { inner, .. }
2771            | HydroNode::BeginAtomic { inner, .. }
2772            | HydroNode::EndAtomic { inner, .. }
2773            | HydroNode::Batch { inner, .. }
2774            | HydroNode::YieldConcat { inner, .. }
2775            | HydroNode::UnboundSingleton { inner, .. }
2776            | HydroNode::AssertIsConsistent { inner, .. } => {
2777                transform(inner.as_mut(), seen_tees);
2778            }
2779
2780            HydroNode::Chain { first, second, .. } => {
2781                transform(first.as_mut(), seen_tees);
2782                transform(second.as_mut(), seen_tees);
2783            }
2784
2785            HydroNode::MergeOrdered { first, second, .. } => {
2786                transform(first.as_mut(), seen_tees);
2787                transform(second.as_mut(), seen_tees);
2788            }
2789
2790            HydroNode::ChainFirst { first, second, .. } => {
2791                transform(first.as_mut(), seen_tees);
2792                transform(second.as_mut(), seen_tees);
2793            }
2794
2795            HydroNode::CrossSingleton { left, right, .. }
2796            | HydroNode::CrossProduct { left, right, .. }
2797            | HydroNode::Join { left, right, .. }
2798            | HydroNode::JoinHalf { left, right, .. } => {
2799                transform(left.as_mut(), seen_tees);
2800                transform(right.as_mut(), seen_tees);
2801            }
2802
2803            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2804                transform(pos.as_mut(), seen_tees);
2805                transform(neg.as_mut(), seen_tees);
2806            }
2807
2808            HydroNode::Map { f, input, .. } => {
2809                f.transform_children(&mut transform, seen_tees);
2810                transform(input.as_mut(), seen_tees);
2811            }
2812            HydroNode::FlatMap { f, input, .. }
2813            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2814            | HydroNode::Filter { f, input, .. }
2815            | HydroNode::FilterMap { f, input, .. }
2816            | HydroNode::Inspect { f, input, .. }
2817            | HydroNode::Reduce { f, input, .. }
2818            | HydroNode::ReduceKeyed { f, input, .. } => {
2819                f.transform_children(&mut transform, seen_tees);
2820                transform(input.as_mut(), seen_tees);
2821            }
2822            HydroNode::ReduceKeyedWatermark {
2823                f,
2824                input,
2825                watermark,
2826                ..
2827            } => {
2828                f.transform_children(&mut transform, seen_tees);
2829                transform(input.as_mut(), seen_tees);
2830                transform(watermark.as_mut(), seen_tees);
2831            }
2832            HydroNode::Fold {
2833                init, acc, input, ..
2834            }
2835            | HydroNode::Scan {
2836                init, acc, input, ..
2837            }
2838            | HydroNode::ScanAsyncBlocking {
2839                init, acc, input, ..
2840            }
2841            | HydroNode::FoldKeyed {
2842                init, acc, input, ..
2843            } => {
2844                init.transform_children(&mut transform, seen_tees);
2845                acc.transform_children(&mut transform, seen_tees);
2846                transform(input.as_mut(), seen_tees);
2847            }
2848            HydroNode::ResolveFutures { input, .. }
2849            | HydroNode::ResolveFuturesBlocking { input, .. }
2850            | HydroNode::ResolveFuturesOrdered { input, .. }
2851            | HydroNode::Sort { input, .. }
2852            | HydroNode::DeferTick { input, .. }
2853            | HydroNode::Enumerate { input, .. }
2854            | HydroNode::Unique { input, .. }
2855            | HydroNode::Network { input, .. }
2856            | HydroNode::Counter { input, .. } => {
2857                transform(input.as_mut(), seen_tees);
2858            }
2859        }
2860    }
2861
2862    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2863        match self {
2864            HydroNode::Placeholder => HydroNode::Placeholder,
2865            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2866                inner: Box::new(inner.deep_clone(seen_tees)),
2867                metadata: metadata.clone(),
2868            },
2869            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2870                inner: Box::new(inner.deep_clone(seen_tees)),
2871                metadata: metadata.clone(),
2872            },
2873            HydroNode::ObserveNonDet {
2874                inner,
2875                trusted,
2876                metadata,
2877            } => HydroNode::ObserveNonDet {
2878                inner: Box::new(inner.deep_clone(seen_tees)),
2879                trusted: *trusted,
2880                metadata: metadata.clone(),
2881            },
2882            HydroNode::AssertIsConsistent {
2883                inner,
2884                trusted,
2885                metadata,
2886            } => HydroNode::AssertIsConsistent {
2887                inner: Box::new(inner.deep_clone(seen_tees)),
2888                trusted: *trusted,
2889                metadata: metadata.clone(),
2890            },
2891            HydroNode::Source { source, metadata } => HydroNode::Source {
2892                source: source.clone(),
2893                metadata: metadata.clone(),
2894            },
2895            HydroNode::SingletonSource {
2896                value,
2897                first_tick_only,
2898                metadata,
2899            } => HydroNode::SingletonSource {
2900                value: value.clone(),
2901                first_tick_only: *first_tick_only,
2902                metadata: metadata.clone(),
2903            },
2904            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2905                cycle_id: *cycle_id,
2906                metadata: metadata.clone(),
2907            },
2908            HydroNode::Tee { inner, metadata }
2909            | HydroNode::Reference {
2910                inner, metadata, ..
2911            } => {
2912                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2913                    SharedNode(transformed.clone())
2914                } else {
2915                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2916                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2917                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2918                    *new_rc.borrow_mut() = cloned;
2919                    SharedNode(new_rc)
2920                };
2921                if let HydroNode::Reference {
2922                    kind,
2923                    access_counter,
2924                    ..
2925                } = self
2926                {
2927                    HydroNode::Reference {
2928                        inner: cloned_inner,
2929                        kind: *kind,
2930                        access_counter: access_counter.freeze(),
2931                        metadata: metadata.clone(),
2932                    }
2933                } else {
2934                    HydroNode::Tee {
2935                        inner: cloned_inner,
2936                        metadata: metadata.clone(),
2937                    }
2938                }
2939            }
2940            HydroNode::Partition {
2941                inner,
2942                f,
2943                is_true,
2944                metadata,
2945            } => {
2946                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2947                    HydroNode::Partition {
2948                        inner: SharedNode(transformed.clone()),
2949                        f: f.deep_clone(seen_tees),
2950                        is_true: *is_true,
2951                        metadata: metadata.clone(),
2952                    }
2953                } else {
2954                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2955                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2956                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2957                    *new_rc.borrow_mut() = cloned;
2958                    HydroNode::Partition {
2959                        inner: SharedNode(new_rc),
2960                        f: f.deep_clone(seen_tees),
2961                        is_true: *is_true,
2962                        metadata: metadata.clone(),
2963                    }
2964                }
2965            }
2966            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2967                inner: Box::new(inner.deep_clone(seen_tees)),
2968                metadata: metadata.clone(),
2969            },
2970            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2971                inner: Box::new(inner.deep_clone(seen_tees)),
2972                metadata: metadata.clone(),
2973            },
2974            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2975                inner: Box::new(inner.deep_clone(seen_tees)),
2976                metadata: metadata.clone(),
2977            },
2978            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2979                inner: Box::new(inner.deep_clone(seen_tees)),
2980                metadata: metadata.clone(),
2981            },
2982            HydroNode::Chain {
2983                first,
2984                second,
2985                metadata,
2986            } => HydroNode::Chain {
2987                first: Box::new(first.deep_clone(seen_tees)),
2988                second: Box::new(second.deep_clone(seen_tees)),
2989                metadata: metadata.clone(),
2990            },
2991            HydroNode::MergeOrdered {
2992                first,
2993                second,
2994                metadata,
2995            } => HydroNode::MergeOrdered {
2996                first: Box::new(first.deep_clone(seen_tees)),
2997                second: Box::new(second.deep_clone(seen_tees)),
2998                metadata: metadata.clone(),
2999            },
3000            HydroNode::ChainFirst {
3001                first,
3002                second,
3003                metadata,
3004            } => HydroNode::ChainFirst {
3005                first: Box::new(first.deep_clone(seen_tees)),
3006                second: Box::new(second.deep_clone(seen_tees)),
3007                metadata: metadata.clone(),
3008            },
3009            HydroNode::CrossProduct {
3010                left,
3011                right,
3012                metadata,
3013            } => HydroNode::CrossProduct {
3014                left: Box::new(left.deep_clone(seen_tees)),
3015                right: Box::new(right.deep_clone(seen_tees)),
3016                metadata: metadata.clone(),
3017            },
3018            HydroNode::CrossSingleton {
3019                left,
3020                right,
3021                metadata,
3022            } => HydroNode::CrossSingleton {
3023                left: Box::new(left.deep_clone(seen_tees)),
3024                right: Box::new(right.deep_clone(seen_tees)),
3025                metadata: metadata.clone(),
3026            },
3027            HydroNode::Join {
3028                left,
3029                right,
3030                metadata,
3031            } => HydroNode::Join {
3032                left: Box::new(left.deep_clone(seen_tees)),
3033                right: Box::new(right.deep_clone(seen_tees)),
3034                metadata: metadata.clone(),
3035            },
3036            HydroNode::JoinHalf {
3037                left,
3038                right,
3039                metadata,
3040            } => HydroNode::JoinHalf {
3041                left: Box::new(left.deep_clone(seen_tees)),
3042                right: Box::new(right.deep_clone(seen_tees)),
3043                metadata: metadata.clone(),
3044            },
3045            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3046                pos: Box::new(pos.deep_clone(seen_tees)),
3047                neg: Box::new(neg.deep_clone(seen_tees)),
3048                metadata: metadata.clone(),
3049            },
3050            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3051                pos: Box::new(pos.deep_clone(seen_tees)),
3052                neg: Box::new(neg.deep_clone(seen_tees)),
3053                metadata: metadata.clone(),
3054            },
3055            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3056                input: Box::new(input.deep_clone(seen_tees)),
3057                metadata: metadata.clone(),
3058            },
3059            HydroNode::ResolveFuturesBlocking { input, metadata } => {
3060                HydroNode::ResolveFuturesBlocking {
3061                    input: Box::new(input.deep_clone(seen_tees)),
3062                    metadata: metadata.clone(),
3063                }
3064            }
3065            HydroNode::ResolveFuturesOrdered { input, metadata } => {
3066                HydroNode::ResolveFuturesOrdered {
3067                    input: Box::new(input.deep_clone(seen_tees)),
3068                    metadata: metadata.clone(),
3069                }
3070            }
3071            HydroNode::Map { f, input, metadata } => HydroNode::Map {
3072                f: f.deep_clone(seen_tees),
3073                input: Box::new(input.deep_clone(seen_tees)),
3074                metadata: metadata.clone(),
3075            },
3076            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3077                f: f.deep_clone(seen_tees),
3078                input: Box::new(input.deep_clone(seen_tees)),
3079                metadata: metadata.clone(),
3080            },
3081            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3082                HydroNode::FlatMapStreamBlocking {
3083                    f: f.deep_clone(seen_tees),
3084                    input: Box::new(input.deep_clone(seen_tees)),
3085                    metadata: metadata.clone(),
3086                }
3087            }
3088            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3089                f: f.deep_clone(seen_tees),
3090                input: Box::new(input.deep_clone(seen_tees)),
3091                metadata: metadata.clone(),
3092            },
3093            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3094                f: f.deep_clone(seen_tees),
3095                input: Box::new(input.deep_clone(seen_tees)),
3096                metadata: metadata.clone(),
3097            },
3098            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3099                input: Box::new(input.deep_clone(seen_tees)),
3100                metadata: metadata.clone(),
3101            },
3102            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3103                input: Box::new(input.deep_clone(seen_tees)),
3104                metadata: metadata.clone(),
3105            },
3106            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3107                f: f.deep_clone(seen_tees),
3108                input: Box::new(input.deep_clone(seen_tees)),
3109                metadata: metadata.clone(),
3110            },
3111            HydroNode::Unique { input, metadata } => HydroNode::Unique {
3112                input: Box::new(input.deep_clone(seen_tees)),
3113                metadata: metadata.clone(),
3114            },
3115            HydroNode::Sort { input, metadata } => HydroNode::Sort {
3116                input: Box::new(input.deep_clone(seen_tees)),
3117                metadata: metadata.clone(),
3118            },
3119            HydroNode::Fold {
3120                init,
3121                acc,
3122                input,
3123                metadata,
3124            } => HydroNode::Fold {
3125                init: init.deep_clone(seen_tees),
3126                acc: acc.deep_clone(seen_tees),
3127                input: Box::new(input.deep_clone(seen_tees)),
3128                metadata: metadata.clone(),
3129            },
3130            HydroNode::Scan {
3131                init,
3132                acc,
3133                input,
3134                metadata,
3135            } => HydroNode::Scan {
3136                init: init.deep_clone(seen_tees),
3137                acc: acc.deep_clone(seen_tees),
3138                input: Box::new(input.deep_clone(seen_tees)),
3139                metadata: metadata.clone(),
3140            },
3141            HydroNode::ScanAsyncBlocking {
3142                init,
3143                acc,
3144                input,
3145                metadata,
3146            } => HydroNode::ScanAsyncBlocking {
3147                init: init.deep_clone(seen_tees),
3148                acc: acc.deep_clone(seen_tees),
3149                input: Box::new(input.deep_clone(seen_tees)),
3150                metadata: metadata.clone(),
3151            },
3152            HydroNode::FoldKeyed {
3153                init,
3154                acc,
3155                input,
3156                metadata,
3157            } => HydroNode::FoldKeyed {
3158                init: init.deep_clone(seen_tees),
3159                acc: acc.deep_clone(seen_tees),
3160                input: Box::new(input.deep_clone(seen_tees)),
3161                metadata: metadata.clone(),
3162            },
3163            HydroNode::ReduceKeyedWatermark {
3164                f,
3165                input,
3166                watermark,
3167                metadata,
3168            } => HydroNode::ReduceKeyedWatermark {
3169                f: f.deep_clone(seen_tees),
3170                input: Box::new(input.deep_clone(seen_tees)),
3171                watermark: Box::new(watermark.deep_clone(seen_tees)),
3172                metadata: metadata.clone(),
3173            },
3174            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3175                f: f.deep_clone(seen_tees),
3176                input: Box::new(input.deep_clone(seen_tees)),
3177                metadata: metadata.clone(),
3178            },
3179            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3180                f: f.deep_clone(seen_tees),
3181                input: Box::new(input.deep_clone(seen_tees)),
3182                metadata: metadata.clone(),
3183            },
3184            HydroNode::Network {
3185                name,
3186                networking_info,
3187                serialize_fn,
3188                instantiate_fn,
3189                deserialize_fn,
3190                input,
3191                metadata,
3192            } => HydroNode::Network {
3193                name: name.clone(),
3194                networking_info: networking_info.clone(),
3195                serialize_fn: serialize_fn.clone(),
3196                instantiate_fn: instantiate_fn.clone(),
3197                deserialize_fn: deserialize_fn.clone(),
3198                input: Box::new(input.deep_clone(seen_tees)),
3199                metadata: metadata.clone(),
3200            },
3201            HydroNode::ExternalInput {
3202                from_external_key,
3203                from_port_id,
3204                from_many,
3205                codec_type,
3206                port_hint,
3207                instantiate_fn,
3208                deserialize_fn,
3209                metadata,
3210            } => HydroNode::ExternalInput {
3211                from_external_key: *from_external_key,
3212                from_port_id: *from_port_id,
3213                from_many: *from_many,
3214                codec_type: codec_type.clone(),
3215                port_hint: *port_hint,
3216                instantiate_fn: instantiate_fn.clone(),
3217                deserialize_fn: deserialize_fn.clone(),
3218                metadata: metadata.clone(),
3219            },
3220            HydroNode::Counter {
3221                tag,
3222                duration,
3223                prefix,
3224                input,
3225                metadata,
3226            } => HydroNode::Counter {
3227                tag: tag.clone(),
3228                duration: duration.clone(),
3229                prefix: prefix.clone(),
3230                input: Box::new(input.deep_clone(seen_tees)),
3231                metadata: metadata.clone(),
3232            },
3233        }
3234    }
3235
3236    #[cfg(feature = "build")]
3237    pub fn emit_core(
3238        &mut self,
3239        builders_or_callback: &mut BuildersOrCallback<
3240            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3241            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3242        >,
3243        seen_tees: &mut SeenSharedNodes,
3244        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3245        next_stmt_id: &mut crate::Counter<StmtId>,
3246        fold_hooked_idents: &mut HashSet<String>,
3247    ) -> syn::Ident {
3248        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3249
3250        self.transform_bottom_up(
3251            &mut |node: &mut HydroNode| {
3252                let out_location = node.metadata().location_id.clone();
3253                match node {
3254                    HydroNode::Placeholder => {
3255                        panic!()
3256                    }
3257
3258                    HydroNode::Cast { .. } => {
3259                        // Cast passes through the input ident unchanged
3260                        // The input ident is already on the stack from processing the child
3261                        let _ = next_stmt_id.get_and_increment();
3262                        match builders_or_callback {
3263                            BuildersOrCallback::Builders(_) => {}
3264                            BuildersOrCallback::Callback(_, node_callback) => {
3265                                node_callback(node, next_stmt_id);
3266                            }
3267                        }
3268                        // input_ident stays on stack as output
3269                    }
3270
3271                    HydroNode::UnboundSingleton { .. } => {
3272                        let inner_ident = ident_stack.pop().unwrap();
3273
3274                        let stmt_id = next_stmt_id.get_and_increment();
3275                        let out_ident =
3276                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3277
3278                        match builders_or_callback {
3279                            BuildersOrCallback::Builders(graph_builders) => {
3280                                if graph_builders.singleton_intermediates() {
3281                                    let builder = graph_builders.get_dfir_mut(&out_location);
3282                                    builder.add_dfir(
3283                                        parse_quote! {
3284                                            #out_ident = #inner_ident;
3285                                        },
3286                                        None,
3287                                        None,
3288                                    );
3289                                } else {
3290                                    let builder = graph_builders.get_dfir_mut(&out_location);
3291                                    builder.add_dfir(
3292                                        parse_quote! {
3293                                            #out_ident = #inner_ident -> persist::<'static>();
3294                                        },
3295                                        None,
3296                                        None,
3297                                    );
3298                                }
3299                            }
3300                            BuildersOrCallback::Callback(_, node_callback) => {
3301                                node_callback(node, next_stmt_id);
3302                            }
3303                        }
3304
3305                        ident_stack.push(out_ident);
3306                    }
3307
3308                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3309                        let inner_ident = ident_stack.pop().unwrap();
3310
3311                        let stmt_id = next_stmt_id.get_and_increment();
3312                        let out_ident =
3313                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3314
3315                        match builders_or_callback {
3316                            BuildersOrCallback::Builders(graph_builders) => {
3317                                graph_builders.assert_is_consistent(
3318                                    *trusted,
3319                                    &inner.metadata().location_id,
3320                                    inner_ident,
3321                                    &out_ident,
3322                                );
3323                            }
3324                            BuildersOrCallback::Callback(_, node_callback) => {
3325                                node_callback(node, next_stmt_id);
3326                            }
3327                        }
3328
3329                        ident_stack.push(out_ident);
3330                    }
3331
3332                    HydroNode::ObserveNonDet {
3333                        inner,
3334                        trusted,
3335                        metadata,
3336                        ..
3337                    } => {
3338                        let inner_ident = ident_stack.pop().unwrap();
3339
3340                        let stmt_id = next_stmt_id.get_and_increment();
3341                        let observe_ident =
3342                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3343
3344                        match builders_or_callback {
3345                            BuildersOrCallback::Builders(graph_builders) => {
3346                                graph_builders.observe_nondet(
3347                                    *trusted,
3348                                    &inner.metadata().location_id,
3349                                    inner_ident,
3350                                    &inner.metadata().collection_kind,
3351                                    &observe_ident,
3352                                    &metadata.collection_kind,
3353                                    &metadata.op,
3354                                );
3355                            }
3356                            BuildersOrCallback::Callback(_, node_callback) => {
3357                                node_callback(node, next_stmt_id);
3358                            }
3359                        }
3360
3361                        ident_stack.push(observe_ident);
3362                    }
3363
3364                    HydroNode::Batch {
3365                        inner, metadata, ..
3366                    } => {
3367                        let inner_ident = ident_stack.pop().unwrap();
3368
3369                        let stmt_id = next_stmt_id.get_and_increment();
3370                        let batch_ident =
3371                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3372
3373                        match builders_or_callback {
3374                            BuildersOrCallback::Builders(graph_builders) => {
3375                                graph_builders.batch(
3376                                    inner_ident,
3377                                    &inner.metadata().location_id,
3378                                    &inner.metadata().collection_kind,
3379                                    &batch_ident,
3380                                    &out_location,
3381                                    &metadata.op,
3382                                    fold_hooked_idents,
3383                                );
3384                            }
3385                            BuildersOrCallback::Callback(_, node_callback) => {
3386                                node_callback(node, next_stmt_id);
3387                            }
3388                        }
3389
3390                        ident_stack.push(batch_ident);
3391                    }
3392
3393                    HydroNode::YieldConcat { inner, .. } => {
3394                        let inner_ident = ident_stack.pop().unwrap();
3395
3396                        let stmt_id = next_stmt_id.get_and_increment();
3397                        let yield_ident =
3398                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3399
3400                        match builders_or_callback {
3401                            BuildersOrCallback::Builders(graph_builders) => {
3402                                graph_builders.yield_from_tick(
3403                                    inner_ident,
3404                                    &inner.metadata().location_id,
3405                                    &inner.metadata().collection_kind,
3406                                    &yield_ident,
3407                                    &out_location,
3408                                );
3409                            }
3410                            BuildersOrCallback::Callback(_, node_callback) => {
3411                                node_callback(node, next_stmt_id);
3412                            }
3413                        }
3414
3415                        ident_stack.push(yield_ident);
3416                    }
3417
3418                    HydroNode::BeginAtomic { inner, metadata } => {
3419                        let inner_ident = ident_stack.pop().unwrap();
3420
3421                        let stmt_id = next_stmt_id.get_and_increment();
3422                        let begin_ident =
3423                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3424
3425                        match builders_or_callback {
3426                            BuildersOrCallback::Builders(graph_builders) => {
3427                                graph_builders.begin_atomic(
3428                                    inner_ident,
3429                                    &inner.metadata().location_id,
3430                                    &inner.metadata().collection_kind,
3431                                    &begin_ident,
3432                                    &out_location,
3433                                    &metadata.op,
3434                                );
3435                            }
3436                            BuildersOrCallback::Callback(_, node_callback) => {
3437                                node_callback(node, next_stmt_id);
3438                            }
3439                        }
3440
3441                        ident_stack.push(begin_ident);
3442                    }
3443
3444                    HydroNode::EndAtomic { inner, .. } => {
3445                        let inner_ident = ident_stack.pop().unwrap();
3446
3447                        let stmt_id = next_stmt_id.get_and_increment();
3448                        let end_ident =
3449                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3450
3451                        match builders_or_callback {
3452                            BuildersOrCallback::Builders(graph_builders) => {
3453                                graph_builders.end_atomic(
3454                                    inner_ident,
3455                                    &inner.metadata().location_id,
3456                                    &inner.metadata().collection_kind,
3457                                    &end_ident,
3458                                );
3459                            }
3460                            BuildersOrCallback::Callback(_, node_callback) => {
3461                                node_callback(node, next_stmt_id);
3462                            }
3463                        }
3464
3465                        ident_stack.push(end_ident);
3466                    }
3467
3468                    HydroNode::Source {
3469                        source, metadata, ..
3470                    } => {
3471                        if let HydroSource::ExternalNetwork() = source {
3472                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3473                        } else {
3474                            let stmt_id = next_stmt_id.get_and_increment();
3475                            let source_ident =
3476                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3477
3478                            let source_stmt = match source {
3479                                HydroSource::Stream(expr) => {
3480                                    debug_assert!(metadata.location_id.is_top_level());
3481                                    parse_quote! {
3482                                        #source_ident = source_stream(#expr);
3483                                    }
3484                                }
3485
3486                                HydroSource::ExternalNetwork() => {
3487                                    unreachable!()
3488                                }
3489
3490                                HydroSource::Iter(expr) => {
3491                                    if metadata.location_id.is_top_level() {
3492                                        parse_quote! {
3493                                            #source_ident = source_iter(#expr);
3494                                        }
3495                                    } else {
3496                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3497                                        parse_quote! {
3498                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3499                                        }
3500                                    }
3501                                }
3502
3503                                HydroSource::Spin() => {
3504                                    debug_assert!(metadata.location_id.is_top_level());
3505                                    parse_quote! {
3506                                        #source_ident = spin();
3507                                    }
3508                                }
3509
3510                                HydroSource::ClusterMembers(target_loc, state) => {
3511                                    debug_assert!(metadata.location_id.is_top_level());
3512
3513                                    let members_tee_ident = syn::Ident::new(
3514                                        &format!(
3515                                            "__cluster_members_tee_{}_{}",
3516                                            metadata.location_id.root().key(),
3517                                            target_loc.key(),
3518                                        ),
3519                                        Span::call_site(),
3520                                    );
3521
3522                                    match state {
3523                                        ClusterMembersState::Stream(d) => {
3524                                            parse_quote! {
3525                                                #members_tee_ident = source_stream(#d) -> tee();
3526                                                #source_ident = #members_tee_ident;
3527                                            }
3528                                        },
3529                                        ClusterMembersState::Uninit => syn::parse_quote! {
3530                                            #source_ident = source_stream(DUMMY);
3531                                        },
3532                                        ClusterMembersState::Tee(..) => parse_quote! {
3533                                            #source_ident = #members_tee_ident;
3534                                        },
3535                                    }
3536                                }
3537
3538                                HydroSource::Embedded(ident) => {
3539                                    parse_quote! {
3540                                        #source_ident = source_stream(#ident);
3541                                    }
3542                                }
3543
3544                                HydroSource::EmbeddedSingleton(ident) => {
3545                                    parse_quote! {
3546                                        #source_ident = source_iter([#ident]);
3547                                    }
3548                                }
3549                            };
3550
3551                            match builders_or_callback {
3552                                BuildersOrCallback::Builders(graph_builders) => {
3553                                    let builder = graph_builders.get_dfir_mut(&out_location);
3554                                    builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3555                                }
3556                                BuildersOrCallback::Callback(_, node_callback) => {
3557                                    node_callback(node, next_stmt_id);
3558                                }
3559                            }
3560
3561                            ident_stack.push(source_ident);
3562                        }
3563                    }
3564
3565                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3566                        let stmt_id = next_stmt_id.get_and_increment();
3567                        let source_ident =
3568                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3569
3570                        match builders_or_callback {
3571                            BuildersOrCallback::Builders(graph_builders) => {
3572                                let builder = graph_builders.get_dfir_mut(&out_location);
3573
3574                                if *first_tick_only {
3575                                    assert!(
3576                                        !metadata.location_id.is_top_level(),
3577                                        "first_tick_only SingletonSource must be inside a tick"
3578                                    );
3579                                }
3580
3581                                if *first_tick_only
3582                                    || (metadata.location_id.is_top_level()
3583                                        && metadata.collection_kind.is_bounded())
3584                                {
3585                                    builder.add_dfir(
3586                                        parse_quote! {
3587                                            #source_ident = source_iter([#value]);
3588                                        },
3589                                        None,
3590                                        Some(&stmt_id.to_string()),
3591                                    );
3592                                } else {
3593                                    builder.add_dfir(
3594                                        parse_quote! {
3595                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3596                                        },
3597                                        None,
3598                                        Some(&stmt_id.to_string()),
3599                                    );
3600                                }
3601                            }
3602                            BuildersOrCallback::Callback(_, node_callback) => {
3603                                node_callback(node, next_stmt_id);
3604                            }
3605                        }
3606
3607                        ident_stack.push(source_ident);
3608                    }
3609
3610                    HydroNode::CycleSource { cycle_id, .. } => {
3611                        let ident = cycle_id.as_ident();
3612
3613                        // consume a stmt id even though we did not emit anything so that we can instrument this
3614                        let _ = next_stmt_id.get_and_increment();
3615
3616                        match builders_or_callback {
3617                            BuildersOrCallback::Builders(_) => {}
3618                            BuildersOrCallback::Callback(_, node_callback) => {
3619                                node_callback(node, next_stmt_id);
3620                            }
3621                        }
3622
3623                        ident_stack.push(ident);
3624                    }
3625
3626                    HydroNode::Tee { inner, .. } => {
3627                        // we consume a stmt id regardless of if we emit the tee() operator,
3628                        // so that during rewrites we touch all recipients of the tee()
3629                        let stmt_id = next_stmt_id.get_and_increment();
3630
3631                        let ret_ident = if let Some(built_idents) =
3632                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3633                        {
3634                            match builders_or_callback {
3635                                BuildersOrCallback::Builders(_) => {}
3636                                BuildersOrCallback::Callback(_, node_callback) => {
3637                                    node_callback(node, next_stmt_id);
3638                                }
3639                            }
3640
3641                            built_idents[0].clone()
3642                        } else {
3643                            // The inner node was already processed by transform_bottom_up,
3644                            // so its ident is on the stack
3645                            let inner_ident = ident_stack.pop().unwrap();
3646
3647                            let tee_ident =
3648                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3649
3650                            built_tees.insert(
3651                                inner.0.as_ref() as *const RefCell<HydroNode>,
3652                                vec![tee_ident.clone()],
3653                            );
3654
3655                            match builders_or_callback {
3656                                BuildersOrCallback::Builders(graph_builders) => {
3657                                    // NOTE: With `forward_ref`, the fold codegen may not have
3658                                    // run yet when we reach this tee, so `fold_hooked_idents`
3659                                    // might not contain the inner ident. In that case we won't
3660                                    // propagate the "hooked" status to the tee and the
3661                                    // downstream singleton batch will use the normal
3662                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3663                                    // This is not a soundness issue: the fallback hook still
3664                                    // produces correct behavior, just with a redundant decision
3665                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3666                                    // fix ordering so forward_ref folds are always processed
3667                                    // before their downstream tees.
3668                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3669                                        fold_hooked_idents.insert(tee_ident.to_string());
3670                                    }
3671                                    let builder = graph_builders.get_dfir_mut(&out_location);
3672                                    builder.add_dfir(
3673                                        parse_quote! {
3674                                            #tee_ident = #inner_ident -> tee();
3675                                        },
3676                                        None,
3677                                        Some(&stmt_id.to_string()),
3678                                    );
3679                                }
3680                                BuildersOrCallback::Callback(_, node_callback) => {
3681                                    node_callback(node, next_stmt_id);
3682                                }
3683                            }
3684
3685                            tee_ident
3686                        };
3687
3688                        ident_stack.push(ret_ident);
3689                    }
3690
3691                    HydroNode::Reference { inner, kind, .. } => {
3692                        // we consume a stmt id regardless of if we emit the operator,
3693                        // so that during rewrites we touch all recipients
3694                        let stmt_id = next_stmt_id.get_and_increment();
3695
3696                        let ret_ident = if let Some(built_idents) =
3697                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3698                        {
3699                            built_idents[0].clone()
3700                        } else {
3701                            let inner_ident = ident_stack.pop().unwrap();
3702
3703                            let ref_ident =
3704                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3705
3706                            built_tees.insert(
3707                                inner.0.as_ref() as *const RefCell<HydroNode>,
3708                                vec![ref_ident.clone()],
3709                            );
3710
3711                            match builders_or_callback {
3712                                BuildersOrCallback::Builders(graph_builders) => {
3713                                    let builder = graph_builders.get_dfir_mut(&out_location);
3714                                    let op_ident = syn::Ident::new(
3715                                        match kind {
3716                                            crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3717                                            crate::handoff_ref::HandoffRefKind::Optional => "optional",
3718                                            crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3719                                        },
3720                                        Span::call_site(),
3721                                    );
3722                                    builder.add_dfir(
3723                                        parse_quote! {
3724                                            #ref_ident = #inner_ident -> #op_ident();
3725                                        },
3726                                        None,
3727                                        Some(&stmt_id.to_string()),
3728                                    );
3729                                }
3730                                BuildersOrCallback::Callback(_, node_callback) => {
3731                                    node_callback(node, next_stmt_id);
3732                                }
3733                            }
3734
3735                            ref_ident
3736                        };
3737
3738                        ident_stack.push(ret_ident);
3739                    }
3740
3741                    HydroNode::Partition {
3742                        inner, f, is_true, metadata,
3743                    } => {
3744                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3745                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3746                        let stmt_id = next_stmt_id.get_and_increment();
3747
3748                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3749                            match builders_or_callback {
3750                                BuildersOrCallback::Builders(_) => {}
3751                                BuildersOrCallback::Callback(_, node_callback) => {
3752                                    node_callback(node, next_stmt_id);
3753                                }
3754                            }
3755
3756                            let idx = if is_true { 0 } else { 1 };
3757                            built_idents[idx].clone()
3758                        } else {
3759                            // The inner node was already processed by transform_bottom_up,
3760                            // so its ident is on the stack
3761                            let inner_ident = ident_stack.pop().unwrap();
3762                            let f_tokens = f.emit_tokens(&mut ident_stack);
3763
3764                            let inner_ident = {
3765                                let inner_borrow = inner.0.borrow();
3766                                maybe_observe_for_mut(
3767                                    f, inner_ident,
3768                                    &inner_borrow.metadata().location_id,
3769                                    &inner_borrow.metadata().collection_kind,
3770                                    &metadata.op,
3771                                    builders_or_callback, next_stmt_id,
3772                                )
3773                            };
3774
3775                            let partition_ident = syn::Ident::new(
3776                                &format!("stream_{}_partition", stmt_id),
3777                                Span::call_site(),
3778                            );
3779                            let true_ident = syn::Ident::new(
3780                                &format!("stream_{}_true", stmt_id),
3781                                Span::call_site(),
3782                            );
3783                            let false_ident = syn::Ident::new(
3784                                &format!("stream_{}_false", stmt_id),
3785                                Span::call_site(),
3786                            );
3787
3788                            built_tees.insert(
3789                                ptr,
3790                                vec![true_ident.clone(), false_ident.clone()],
3791                            );
3792
3793                            let stmt_id = next_stmt_id.get_and_increment();
3794                            match builders_or_callback {
3795                                BuildersOrCallback::Builders(graph_builders) => {
3796                                    let builder = graph_builders.get_dfir_mut(&out_location);
3797                                    builder.add_dfir(
3798                                        parse_quote! {
3799                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3800                                            #true_ident = #partition_ident[0];
3801                                            #false_ident = #partition_ident[1];
3802                                        },
3803                                        None,
3804                                        Some(&stmt_id.to_string()),
3805                                    );
3806                                }
3807                                BuildersOrCallback::Callback(_, node_callback) => {
3808                                    node_callback(node, next_stmt_id);
3809                                }
3810                            }
3811
3812                            if is_true { true_ident } else { false_ident }
3813                        };
3814
3815                        ident_stack.push(ret_ident);
3816                    }
3817
3818                    HydroNode::Chain { .. } => {
3819                        // Children are processed left-to-right, so second is on top
3820                        let second_ident = ident_stack.pop().unwrap();
3821                        let first_ident = ident_stack.pop().unwrap();
3822
3823                        let stmt_id = next_stmt_id.get_and_increment();
3824                        let chain_ident =
3825                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3826
3827                        match builders_or_callback {
3828                            BuildersOrCallback::Builders(graph_builders) => {
3829                                let builder = graph_builders.get_dfir_mut(&out_location);
3830                                builder.add_dfir(
3831                                    parse_quote! {
3832                                        #chain_ident = chain();
3833                                        #first_ident -> [0]#chain_ident;
3834                                        #second_ident -> [1]#chain_ident;
3835                                    },
3836                                    None,
3837                                    Some(&stmt_id.to_string()),
3838                                );
3839                            }
3840                            BuildersOrCallback::Callback(_, node_callback) => {
3841                                node_callback(node, next_stmt_id);
3842                            }
3843                        }
3844
3845                        ident_stack.push(chain_ident);
3846                    }
3847
3848                    HydroNode::MergeOrdered { first, metadata, .. } => {
3849                        let second_ident = ident_stack.pop().unwrap();
3850                        let first_ident = ident_stack.pop().unwrap();
3851
3852                        let stmt_id = next_stmt_id.get_and_increment();
3853                        let merge_ident =
3854                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3855
3856                        match builders_or_callback {
3857                            BuildersOrCallback::Builders(graph_builders) => {
3858                                graph_builders.merge_ordered(
3859                                    &first.metadata().location_id,
3860                                    first_ident,
3861                                    second_ident,
3862                                    &merge_ident,
3863                                    &first.metadata().collection_kind,
3864                                    &metadata.op,
3865                                    Some(&stmt_id.to_string()),
3866                                );
3867                            }
3868                            BuildersOrCallback::Callback(_, node_callback) => {
3869                                node_callback(node, next_stmt_id);
3870                            }
3871                        }
3872
3873                        ident_stack.push(merge_ident);
3874                    }
3875
3876                    HydroNode::ChainFirst { .. } => {
3877                        let second_ident = ident_stack.pop().unwrap();
3878                        let first_ident = ident_stack.pop().unwrap();
3879
3880                        let stmt_id = next_stmt_id.get_and_increment();
3881                        let chain_ident =
3882                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3883
3884                        match builders_or_callback {
3885                            BuildersOrCallback::Builders(graph_builders) => {
3886                                let builder = graph_builders.get_dfir_mut(&out_location);
3887                                builder.add_dfir(
3888                                    parse_quote! {
3889                                        #chain_ident = chain_first_n(1);
3890                                        #first_ident -> [0]#chain_ident;
3891                                        #second_ident -> [1]#chain_ident;
3892                                    },
3893                                    None,
3894                                    Some(&stmt_id.to_string()),
3895                                );
3896                            }
3897                            BuildersOrCallback::Callback(_, node_callback) => {
3898                                node_callback(node, next_stmt_id);
3899                            }
3900                        }
3901
3902                        ident_stack.push(chain_ident);
3903                    }
3904
3905                    HydroNode::CrossSingleton { right, .. } => {
3906                        let right_ident = ident_stack.pop().unwrap();
3907                        let left_ident = ident_stack.pop().unwrap();
3908
3909                        let stmt_id = next_stmt_id.get_and_increment();
3910                        let cross_ident =
3911                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3912
3913                        match builders_or_callback {
3914                            BuildersOrCallback::Builders(graph_builders) => {
3915                                let builder = graph_builders.get_dfir_mut(&out_location);
3916
3917                                if right.metadata().location_id.is_top_level()
3918                                    && right.metadata().collection_kind.is_bounded()
3919                                {
3920                                    builder.add_dfir(
3921                                        parse_quote! {
3922                                            #cross_ident = cross_singleton::<'static>();
3923                                            #left_ident -> [input]#cross_ident;
3924                                            #right_ident -> [single]#cross_ident;
3925                                        },
3926                                        None,
3927                                        Some(&stmt_id.to_string()),
3928                                    );
3929                                } else {
3930                                    builder.add_dfir(
3931                                        parse_quote! {
3932                                            #cross_ident = cross_singleton();
3933                                            #left_ident -> [input]#cross_ident;
3934                                            #right_ident -> [single]#cross_ident;
3935                                        },
3936                                        None,
3937                                        Some(&stmt_id.to_string()),
3938                                    );
3939                                }
3940                            }
3941                            BuildersOrCallback::Callback(_, node_callback) => {
3942                                node_callback(node, next_stmt_id);
3943                            }
3944                        }
3945
3946                        ident_stack.push(cross_ident);
3947                    }
3948
3949                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3950                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3951                            parse_quote!(cross_join_multiset)
3952                        } else {
3953                            parse_quote!(join_multiset)
3954                        };
3955
3956                        let (HydroNode::CrossProduct { left, right, .. }
3957                        | HydroNode::Join { left, right, .. }) = node
3958                        else {
3959                            unreachable!()
3960                        };
3961
3962                        let is_top_level = left.metadata().location_id.is_top_level()
3963                            && right.metadata().location_id.is_top_level();
3964                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3965                            quote!('static)
3966                        } else {
3967                            quote!('tick)
3968                        };
3969
3970                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3971                            quote!('static)
3972                        } else {
3973                            quote!('tick)
3974                        };
3975
3976                        let right_ident = ident_stack.pop().unwrap();
3977                        let left_ident = ident_stack.pop().unwrap();
3978
3979                        let stmt_id = next_stmt_id.get_and_increment();
3980                        let stream_ident =
3981                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3982
3983                        match builders_or_callback {
3984                            BuildersOrCallback::Builders(graph_builders) => {
3985                                let builder = graph_builders.get_dfir_mut(&out_location);
3986                                builder.add_dfir(
3987                                    if is_top_level {
3988                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3989                                        // a multiset_delta() to negate the replay behavior
3990                                        parse_quote! {
3991                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3992                                            #left_ident -> [0]#stream_ident;
3993                                            #right_ident -> [1]#stream_ident;
3994                                        }
3995                                    } else {
3996                                        parse_quote! {
3997                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3998                                            #left_ident -> [0]#stream_ident;
3999                                            #right_ident -> [1]#stream_ident;
4000                                        }
4001                                    }
4002                                    ,
4003                                    None,
4004                                    Some(&stmt_id.to_string()),
4005                                );
4006                            }
4007                            BuildersOrCallback::Callback(_, node_callback) => {
4008                                node_callback(node, next_stmt_id);
4009                            }
4010                        }
4011
4012                        ident_stack.push(stream_ident);
4013                    }
4014
4015                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4016                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4017                            parse_quote!(difference)
4018                        } else {
4019                            parse_quote!(anti_join)
4020                        };
4021
4022                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4023                            node
4024                        else {
4025                            unreachable!()
4026                        };
4027
4028                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4029                            quote!('static)
4030                        } else {
4031                            quote!('tick)
4032                        };
4033
4034                        let neg_ident = ident_stack.pop().unwrap();
4035                        let pos_ident = ident_stack.pop().unwrap();
4036
4037                        let stmt_id = next_stmt_id.get_and_increment();
4038                        let stream_ident =
4039                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4040
4041                        match builders_or_callback {
4042                            BuildersOrCallback::Builders(graph_builders) => {
4043                                let builder = graph_builders.get_dfir_mut(&out_location);
4044                                builder.add_dfir(
4045                                    parse_quote! {
4046                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
4047                                        #pos_ident -> [pos]#stream_ident;
4048                                        #neg_ident -> [neg]#stream_ident;
4049                                    },
4050                                    None,
4051                                    Some(&stmt_id.to_string()),
4052                                );
4053                            }
4054                            BuildersOrCallback::Callback(_, node_callback) => {
4055                                node_callback(node, next_stmt_id);
4056                            }
4057                        }
4058
4059                        ident_stack.push(stream_ident);
4060                    }
4061
4062                    HydroNode::JoinHalf { .. } => {
4063                        let HydroNode::JoinHalf { right, .. } = node else {
4064                            unreachable!()
4065                        };
4066
4067                        assert!(
4068                            right.metadata().collection_kind.is_bounded(),
4069                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4070                            right.metadata().collection_kind
4071                        );
4072
4073                        let build_lifetime = if right.metadata().location_id.is_top_level() {
4074                            quote!('static)
4075                        } else {
4076                            quote!('tick)
4077                        };
4078
4079                        let build_ident = ident_stack.pop().unwrap();
4080                        let probe_ident = ident_stack.pop().unwrap();
4081
4082                        let stmt_id = next_stmt_id.get_and_increment();
4083                        let stream_ident =
4084                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4085
4086                        match builders_or_callback {
4087                            BuildersOrCallback::Builders(graph_builders) => {
4088                                let builder = graph_builders.get_dfir_mut(&out_location);
4089                                builder.add_dfir(
4090                                    parse_quote! {
4091                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4092                                        #probe_ident -> [probe]#stream_ident;
4093                                        #build_ident -> [build]#stream_ident;
4094                                    },
4095                                    None,
4096                                    Some(&stmt_id.to_string()),
4097                                );
4098                            }
4099                            BuildersOrCallback::Callback(_, node_callback) => {
4100                                node_callback(node, next_stmt_id);
4101                            }
4102                        }
4103
4104                        ident_stack.push(stream_ident);
4105                    }
4106
4107                    HydroNode::ResolveFutures { .. } => {
4108                        let input_ident = ident_stack.pop().unwrap();
4109
4110                        let stmt_id = next_stmt_id.get_and_increment();
4111                        let futures_ident =
4112                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4113
4114                        match builders_or_callback {
4115                            BuildersOrCallback::Builders(graph_builders) => {
4116                                let builder = graph_builders.get_dfir_mut(&out_location);
4117                                builder.add_dfir(
4118                                    parse_quote! {
4119                                        #futures_ident = #input_ident -> resolve_futures();
4120                                    },
4121                                    None,
4122                                    Some(&stmt_id.to_string()),
4123                                );
4124                            }
4125                            BuildersOrCallback::Callback(_, node_callback) => {
4126                                node_callback(node, next_stmt_id);
4127                            }
4128                        }
4129
4130                        ident_stack.push(futures_ident);
4131                    }
4132
4133                    HydroNode::ResolveFuturesBlocking { .. } => {
4134                        let input_ident = ident_stack.pop().unwrap();
4135
4136                        let stmt_id = next_stmt_id.get_and_increment();
4137                        let futures_ident =
4138                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4139
4140                        match builders_or_callback {
4141                            BuildersOrCallback::Builders(graph_builders) => {
4142                                let builder = graph_builders.get_dfir_mut(&out_location);
4143                                builder.add_dfir(
4144                                    parse_quote! {
4145                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4146                                    },
4147                                    None,
4148                                    Some(&stmt_id.to_string()),
4149                                );
4150                            }
4151                            BuildersOrCallback::Callback(_, node_callback) => {
4152                                node_callback(node, next_stmt_id);
4153                            }
4154                        }
4155
4156                        ident_stack.push(futures_ident);
4157                    }
4158
4159                    HydroNode::ResolveFuturesOrdered { .. } => {
4160                        let input_ident = ident_stack.pop().unwrap();
4161
4162                        let stmt_id = next_stmt_id.get_and_increment();
4163                        let futures_ident =
4164                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4165
4166                        match builders_or_callback {
4167                            BuildersOrCallback::Builders(graph_builders) => {
4168                                let builder = graph_builders.get_dfir_mut(&out_location);
4169                                builder.add_dfir(
4170                                    parse_quote! {
4171                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4172                                    },
4173                                    None,
4174                                    Some(&stmt_id.to_string()),
4175                                );
4176                            }
4177                            BuildersOrCallback::Callback(_, node_callback) => {
4178                                node_callback(node, next_stmt_id);
4179                            }
4180                        }
4181
4182                        ident_stack.push(futures_ident);
4183                    }
4184
4185                    HydroNode::Map {
4186                        f,
4187                        input,
4188                        metadata,
4189                    } => {
4190                        // Pop input ident (pushed last by transform_children).
4191                        let input_ident = ident_stack.pop().unwrap();
4192                        let f_tokens = f.emit_tokens(&mut ident_stack);
4193
4194                        let input_ident = maybe_observe_for_mut(
4195                            f,
4196                            input_ident,
4197                            &input.metadata().location_id,
4198                            &input.metadata().collection_kind,
4199                            &metadata.op,
4200                            builders_or_callback,
4201                            next_stmt_id,
4202                        );
4203
4204                        let stmt_id = next_stmt_id.get_and_increment();
4205                        let map_ident =
4206                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4207
4208                        match builders_or_callback {
4209                            BuildersOrCallback::Builders(graph_builders) => {
4210                                let builder = graph_builders.get_dfir_mut(&out_location);
4211                                builder.add_dfir(
4212                                    parse_quote! {
4213                                        #map_ident = #input_ident -> map(#f_tokens);
4214                                    },
4215                                    None,
4216                                    Some(&stmt_id.to_string()),
4217                                );
4218                            }
4219                            BuildersOrCallback::Callback(_, node_callback) => {
4220                                node_callback(node, next_stmt_id);
4221                            }
4222                        }
4223
4224                        ident_stack.push(map_ident);
4225                    }
4226
4227                    HydroNode::FlatMap { f, input, metadata } => {
4228                        let input_ident = ident_stack.pop().unwrap();
4229                        let f_tokens = f.emit_tokens(&mut ident_stack);
4230
4231                        let input_ident = maybe_observe_for_mut(
4232                            f, input_ident,
4233                            &input.metadata().location_id,
4234                            &input.metadata().collection_kind,
4235                            &metadata.op,
4236                            builders_or_callback, next_stmt_id,
4237                        );
4238
4239                        let stmt_id = next_stmt_id.get_and_increment();
4240                        let flat_map_ident =
4241                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4242
4243                        match builders_or_callback {
4244                            BuildersOrCallback::Builders(graph_builders) => {
4245                                let builder = graph_builders.get_dfir_mut(&out_location);
4246                                builder.add_dfir(
4247                                    parse_quote! {
4248                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4249                                    },
4250                                    None,
4251                                    Some(&stmt_id.to_string()),
4252                                );
4253                            }
4254                            BuildersOrCallback::Callback(_, node_callback) => {
4255                                node_callback(node, next_stmt_id);
4256                            }
4257                        }
4258
4259                        ident_stack.push(flat_map_ident);
4260                    }
4261
4262                    HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4263                        let input_ident = ident_stack.pop().unwrap();
4264                        let f_tokens = f.emit_tokens(&mut ident_stack);
4265
4266                        let input_ident = maybe_observe_for_mut(
4267                            f, input_ident,
4268                            &input.metadata().location_id,
4269                            &input.metadata().collection_kind,
4270                            &metadata.op,
4271                            builders_or_callback, next_stmt_id,
4272                        );
4273
4274                        let stmt_id = next_stmt_id.get_and_increment();
4275                        let flat_map_stream_blocking_ident =
4276                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4277
4278                        match builders_or_callback {
4279                            BuildersOrCallback::Builders(graph_builders) => {
4280                                let builder = graph_builders.get_dfir_mut(&out_location);
4281                                builder.add_dfir(
4282                                    parse_quote! {
4283                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4284                                    },
4285                                    None,
4286                                    Some(&stmt_id.to_string()),
4287                                );
4288                            }
4289                            BuildersOrCallback::Callback(_, node_callback) => {
4290                                node_callback(node, next_stmt_id);
4291                            }
4292                        }
4293
4294                        ident_stack.push(flat_map_stream_blocking_ident);
4295                    }
4296
4297                    HydroNode::Filter { f, input, metadata } => {
4298                        let input_ident = ident_stack.pop().unwrap();
4299                        let f_tokens = f.emit_tokens(&mut ident_stack);
4300
4301                        let input_ident = maybe_observe_for_mut(
4302                            f, input_ident,
4303                            &input.metadata().location_id,
4304                            &input.metadata().collection_kind,
4305                            &metadata.op,
4306                            builders_or_callback, next_stmt_id,
4307                        );
4308
4309                        let stmt_id = next_stmt_id.get_and_increment();
4310                        let filter_ident =
4311                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4312
4313                        match builders_or_callback {
4314                            BuildersOrCallback::Builders(graph_builders) => {
4315                                let builder = graph_builders.get_dfir_mut(&out_location);
4316                                builder.add_dfir(
4317                                    parse_quote! {
4318                                        #filter_ident = #input_ident -> filter(#f_tokens);
4319                                    },
4320                                    None,
4321                                    Some(&stmt_id.to_string()),
4322                                );
4323                            }
4324                            BuildersOrCallback::Callback(_, node_callback) => {
4325                                node_callback(node, next_stmt_id);
4326                            }
4327                        }
4328
4329                        ident_stack.push(filter_ident);
4330                    }
4331
4332                    HydroNode::FilterMap { f, input, metadata } => {
4333                        let input_ident = ident_stack.pop().unwrap();
4334                        let f_tokens = f.emit_tokens(&mut ident_stack);
4335
4336                        let input_ident = maybe_observe_for_mut(
4337                            f, input_ident,
4338                            &input.metadata().location_id,
4339                            &input.metadata().collection_kind,
4340                            &metadata.op,
4341                            builders_or_callback, next_stmt_id,
4342                        );
4343
4344                        let stmt_id = next_stmt_id.get_and_increment();
4345                        let filter_map_ident =
4346                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4347
4348                        match builders_or_callback {
4349                            BuildersOrCallback::Builders(graph_builders) => {
4350                                let builder = graph_builders.get_dfir_mut(&out_location);
4351                                builder.add_dfir(
4352                                    parse_quote! {
4353                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4354                                    },
4355                                    None,
4356                                    Some(&stmt_id.to_string()),
4357                                );
4358                            }
4359                            BuildersOrCallback::Callback(_, node_callback) => {
4360                                node_callback(node, next_stmt_id);
4361                            }
4362                        }
4363
4364                        ident_stack.push(filter_map_ident);
4365                    }
4366
4367                    HydroNode::Sort { .. } => {
4368                        let input_ident = ident_stack.pop().unwrap();
4369
4370                        let stmt_id = next_stmt_id.get_and_increment();
4371                        let sort_ident =
4372                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4373
4374                        match builders_or_callback {
4375                            BuildersOrCallback::Builders(graph_builders) => {
4376                                let builder = graph_builders.get_dfir_mut(&out_location);
4377                                builder.add_dfir(
4378                                    parse_quote! {
4379                                        #sort_ident = #input_ident -> sort();
4380                                    },
4381                                    None,
4382                                    Some(&stmt_id.to_string()),
4383                                );
4384                            }
4385                            BuildersOrCallback::Callback(_, node_callback) => {
4386                                node_callback(node, next_stmt_id);
4387                            }
4388                        }
4389
4390                        ident_stack.push(sort_ident);
4391                    }
4392
4393                    HydroNode::DeferTick { .. } => {
4394                        let input_ident = ident_stack.pop().unwrap();
4395
4396                        let stmt_id = next_stmt_id.get_and_increment();
4397                        let defer_tick_ident =
4398                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4399
4400                        match builders_or_callback {
4401                            BuildersOrCallback::Builders(graph_builders) => {
4402                                let builder = graph_builders.get_dfir_mut(&out_location);
4403                                builder.add_dfir(
4404                                    parse_quote! {
4405                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4406                                    },
4407                                    None,
4408                                    Some(&stmt_id.to_string()),
4409                                );
4410                            }
4411                            BuildersOrCallback::Callback(_, node_callback) => {
4412                                node_callback(node, next_stmt_id);
4413                            }
4414                        }
4415
4416                        ident_stack.push(defer_tick_ident);
4417                    }
4418
4419                    HydroNode::Enumerate { input, .. } => {
4420                        let input_ident = ident_stack.pop().unwrap();
4421
4422                        let stmt_id = next_stmt_id.get_and_increment();
4423                        let enumerate_ident =
4424                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4425
4426                        match builders_or_callback {
4427                            BuildersOrCallback::Builders(graph_builders) => {
4428                                let builder = graph_builders.get_dfir_mut(&out_location);
4429                                let lifetime = if input.metadata().location_id.is_top_level() {
4430                                    quote!('static)
4431                                } else {
4432                                    quote!('tick)
4433                                };
4434                                builder.add_dfir(
4435                                    parse_quote! {
4436                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4437                                    },
4438                                    None,
4439                                    Some(&stmt_id.to_string()),
4440                                );
4441                            }
4442                            BuildersOrCallback::Callback(_, node_callback) => {
4443                                node_callback(node, next_stmt_id);
4444                            }
4445                        }
4446
4447                        ident_stack.push(enumerate_ident);
4448                    }
4449
4450                    HydroNode::Inspect { f, input, metadata } => {
4451                        let input_ident = ident_stack.pop().unwrap();
4452                        let f_tokens = f.emit_tokens(&mut ident_stack);
4453
4454                        let input_ident = maybe_observe_for_mut(
4455                            f, input_ident,
4456                            &input.metadata().location_id,
4457                            &input.metadata().collection_kind,
4458                            &metadata.op,
4459                            builders_or_callback, next_stmt_id,
4460                        );
4461
4462                        let stmt_id = next_stmt_id.get_and_increment();
4463                        let inspect_ident =
4464                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4465
4466                        match builders_or_callback {
4467                            BuildersOrCallback::Builders(graph_builders) => {
4468                                let builder = graph_builders.get_dfir_mut(&out_location);
4469                                builder.add_dfir(
4470                                    parse_quote! {
4471                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4472                                    },
4473                                    None,
4474                                    Some(&stmt_id.to_string()),
4475                                );
4476                            }
4477                            BuildersOrCallback::Callback(_, node_callback) => {
4478                                node_callback(node, next_stmt_id);
4479                            }
4480                        }
4481
4482                        ident_stack.push(inspect_ident);
4483                    }
4484
4485                    HydroNode::Unique { input, .. } => {
4486                        let input_ident = ident_stack.pop().unwrap();
4487
4488                        let stmt_id = next_stmt_id.get_and_increment();
4489                        let unique_ident =
4490                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4491
4492                        match builders_or_callback {
4493                            BuildersOrCallback::Builders(graph_builders) => {
4494                                let builder = graph_builders.get_dfir_mut(&out_location);
4495                                let lifetime = if input.metadata().location_id.is_top_level() {
4496                                    quote!('static)
4497                                } else {
4498                                    quote!('tick)
4499                                };
4500
4501                                builder.add_dfir(
4502                                    parse_quote! {
4503                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4504                                    },
4505                                    None,
4506                                    Some(&stmt_id.to_string()),
4507                                );
4508                            }
4509                            BuildersOrCallback::Callback(_, node_callback) => {
4510                                node_callback(node, next_stmt_id);
4511                            }
4512                        }
4513
4514                        ident_stack.push(unique_ident);
4515                    }
4516
4517                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4518                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4519                            if input.metadata().location_id.is_top_level()
4520                                && input.metadata().collection_kind.is_bounded()
4521                            {
4522                                parse_quote!(fold_no_replay)
4523                            } else {
4524                                parse_quote!(fold)
4525                            }
4526                        } else if matches!(node, HydroNode::Scan { .. }) {
4527                            parse_quote!(scan)
4528                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4529                            parse_quote!(scan_async_blocking)
4530                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4531                            if input.metadata().location_id.is_top_level()
4532                                && input.metadata().collection_kind.is_bounded()
4533                            {
4534                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4535                            } else {
4536                                parse_quote!(fold_keyed)
4537                            }
4538                        } else {
4539                            unreachable!()
4540                        };
4541
4542                        let (HydroNode::Fold { input, .. }
4543                        | HydroNode::FoldKeyed { input, .. }
4544                        | HydroNode::Scan { input, .. }
4545                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4546                        else {
4547                            unreachable!()
4548                        };
4549
4550                        let lifetime = if input.metadata().location_id.is_top_level() {
4551                            quote!('static)
4552                        } else {
4553                            quote!('tick)
4554                        };
4555
4556                        let input_ident = ident_stack.pop().unwrap();
4557
4558                        let (HydroNode::Fold { init, acc, .. }
4559                        | HydroNode::FoldKeyed { init, acc, .. }
4560                        | HydroNode::Scan { init, acc, .. }
4561                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4562                        else {
4563                            unreachable!()
4564                        };
4565
4566                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4567                        let init_tokens = init.emit_tokens(&mut ident_stack);
4568
4569                        let stmt_id = next_stmt_id.get_and_increment();
4570                        let fold_ident =
4571                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4572
4573                        match builders_or_callback {
4574                            BuildersOrCallback::Builders(graph_builders) => {
4575                                if matches!(node, HydroNode::Fold { .. })
4576                                    && node.metadata().location_id.is_top_level()
4577                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4578                                    && graph_builders.singleton_intermediates()
4579                                    && !node.metadata().collection_kind.is_bounded()
4580                                {
4581                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4582                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4583                                        &input.metadata().location_id,
4584                                        &input_ident,
4585                                        &input.metadata().collection_kind,
4586                                        &node.metadata().op,
4587                                    );
4588
4589                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4590                                        let acc: syn::Expr = parse_quote!({
4591                                            let mut __inner = #acc_tokens;
4592                                            move |__state, __batch: Vec<_>| {
4593                                                if __batch.is_empty() {
4594                                                    return None;
4595                                                }
4596                                                for __value in __batch {
4597                                                    __inner(__state, __value);
4598                                                }
4599                                                Some(__state.clone())
4600                                            }
4601                                        });
4602                                        (hooked, acc)
4603                                    } else {
4604                                        let acc: syn::Expr = parse_quote!({
4605                                            let mut __inner = #acc_tokens;
4606                                            move |__state, __value| {
4607                                                __inner(__state, __value);
4608                                                Some(__state.clone())
4609                                            }
4610                                        });
4611                                        (&input_ident, acc)
4612                                    };
4613
4614                                    let builder = graph_builders.get_dfir_mut(&out_location);
4615                                    builder.add_dfir(
4616                                        parse_quote! {
4617                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4618                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4619                                            #fold_ident = chain();
4620                                        },
4621                                        None,
4622                                        Some(&stmt_id.to_string()),
4623                                    );
4624
4625                                    if hooked_input_ident.is_some() {
4626                                        fold_hooked_idents.insert(fold_ident.to_string());
4627                                    }
4628                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4629                                    && node.metadata().location_id.is_top_level()
4630                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4631                                    && graph_builders.singleton_intermediates()
4632                                    && !node.metadata().collection_kind.is_bounded()
4633                                {
4634                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4635                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4636                                        &input.metadata().location_id,
4637                                        &input_ident,
4638                                        &input.metadata().collection_kind,
4639                                        &node.metadata().op,
4640                                    );
4641                                    let builder = graph_builders.get_dfir_mut(&out_location);
4642
4643                                    let wrapped_acc: syn::Expr = parse_quote!({
4644                                        let mut __init = #init_tokens;
4645                                        let mut __inner = #acc_tokens;
4646                                        move |__state, __kv: (_, _)| {
4647                                            // TODO(shadaj): we can avoid the clone when the entry exists
4648                                            let __state = __state
4649                                                .entry(::std::clone::Clone::clone(&__kv.0))
4650                                                .or_insert_with(|| (__init)());
4651                                            __inner(__state, __kv.1);
4652                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4653                                        }
4654                                    });
4655
4656                                    if let Some(hooked_input_ident) = hooked_input_ident {
4657                                        builder.add_dfir(
4658                                            parse_quote! {
4659                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4660                                            },
4661                                            None,
4662                                            Some(&stmt_id.to_string()),
4663                                        );
4664
4665                                        fold_hooked_idents.insert(fold_ident.to_string());
4666                                    } else {
4667                                        builder.add_dfir(
4668                                            parse_quote! {
4669                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4670                                            },
4671                                            None,
4672                                            Some(&stmt_id.to_string()),
4673                                        );
4674                                    }
4675                                } else if (matches!(node, HydroNode::Fold { .. })
4676                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4677                                    && !node.metadata().location_id.is_top_level()
4678                                    && graph_builders.singleton_intermediates()
4679                                {
4680                                    let input_ref = match &*node {
4681                                        HydroNode::Fold { input, .. } => input,
4682                                        HydroNode::FoldKeyed { input, .. } => input,
4683                                        _ => unreachable!(),
4684                                    };
4685                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4686                                        &input_ref.metadata().location_id,
4687                                        &input_ident,
4688                                        &input_ref.metadata().collection_kind,
4689                                        &node.metadata().op,
4690                                    );
4691
4692                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4693                                    let builder = graph_builders.get_dfir_mut(&out_location);
4694                                    builder.add_dfir(
4695                                        parse_quote! {
4696                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4697                                        },
4698                                        None,
4699                                        Some(&stmt_id.to_string()),
4700                                    );
4701                                } else {
4702                                    let builder = graph_builders.get_dfir_mut(&out_location);
4703                                    builder.add_dfir(
4704                                        parse_quote! {
4705                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4706                                        },
4707                                        None,
4708                                        Some(&stmt_id.to_string()),
4709                                    );
4710                                }
4711                            }
4712                            BuildersOrCallback::Callback(_, node_callback) => {
4713                                node_callback(node, next_stmt_id);
4714                            }
4715                        }
4716
4717                        ident_stack.push(fold_ident);
4718                    }
4719
4720                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4721                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4722                            if input.metadata().location_id.is_top_level()
4723                                && input.metadata().collection_kind.is_bounded()
4724                            {
4725                                parse_quote!(reduce_no_replay)
4726                            } else {
4727                                parse_quote!(reduce)
4728                            }
4729                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4730                            if input.metadata().location_id.is_top_level()
4731                                && input.metadata().collection_kind.is_bounded()
4732                            {
4733                                todo!(
4734                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4735                                )
4736                            } else {
4737                                parse_quote!(reduce_keyed)
4738                            }
4739                        } else {
4740                            unreachable!()
4741                        };
4742
4743                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4744                        else {
4745                            unreachable!()
4746                        };
4747
4748                        let lifetime = if input.metadata().location_id.is_top_level() {
4749                            quote!('static)
4750                        } else {
4751                            quote!('tick)
4752                        };
4753
4754                        let input_ident = ident_stack.pop().unwrap();
4755
4756                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4757                        else {
4758                            unreachable!()
4759                        };
4760
4761                        let f_tokens = f.emit_tokens(&mut ident_stack);
4762
4763                        let stmt_id = next_stmt_id.get_and_increment();
4764                        let reduce_ident =
4765                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4766
4767                        match builders_or_callback {
4768                            BuildersOrCallback::Builders(graph_builders) => {
4769                                if matches!(node, HydroNode::Reduce { .. })
4770                                    && node.metadata().location_id.is_top_level()
4771                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4772                                    && graph_builders.singleton_intermediates()
4773                                    && !node.metadata().collection_kind.is_bounded()
4774                                {
4775                                    todo!(
4776                                        "Reduce with optional intermediates is not yet supported in simulator"
4777                                    );
4778                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4779                                    && node.metadata().location_id.is_top_level()
4780                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4781                                    && graph_builders.singleton_intermediates()
4782                                    && !node.metadata().collection_kind.is_bounded()
4783                                {
4784                                    todo!(
4785                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4786                                    );
4787                                } else {
4788                                    let builder = graph_builders.get_dfir_mut(&out_location);
4789                                    builder.add_dfir(
4790                                        parse_quote! {
4791                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4792                                        },
4793                                        None,
4794                                        Some(&stmt_id.to_string()),
4795                                    );
4796                                }
4797                            }
4798                            BuildersOrCallback::Callback(_, node_callback) => {
4799                                node_callback(node, next_stmt_id);
4800                            }
4801                        }
4802
4803                        ident_stack.push(reduce_ident);
4804                    }
4805
4806                    HydroNode::ReduceKeyedWatermark {
4807                        f,
4808                        input,
4809                        metadata,
4810                        ..
4811                    } => {
4812                        let lifetime = if input.metadata().location_id.is_top_level() {
4813                            quote!('static)
4814                        } else {
4815                            quote!('tick)
4816                        };
4817
4818                        // watermark is processed second, so it's on top
4819                        let watermark_ident = ident_stack.pop().unwrap();
4820                        let input_ident = ident_stack.pop().unwrap();
4821                        let f_tokens = f.emit_tokens(&mut ident_stack);
4822
4823                        let stmt_id = next_stmt_id.get_and_increment();
4824                        let chain_ident = syn::Ident::new(
4825                            &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4826                            Span::call_site(),
4827                        );
4828
4829                        let fold_ident =
4830                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4831
4832                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4833                            && input.metadata().collection_kind.is_bounded()
4834                        {
4835                            parse_quote!(fold_no_replay)
4836                        } else {
4837                            parse_quote!(fold)
4838                        };
4839
4840                        match builders_or_callback {
4841                            BuildersOrCallback::Builders(graph_builders) => {
4842                                if metadata.location_id.is_top_level()
4843                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4844                                    && graph_builders.singleton_intermediates()
4845                                    && !metadata.collection_kind.is_bounded()
4846                                {
4847                                    todo!(
4848                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4849                                    )
4850                                } else {
4851                                    let builder = graph_builders.get_dfir_mut(&out_location);
4852                                    builder.add_dfir(
4853                                        parse_quote! {
4854                                            #chain_ident = chain();
4855                                            #input_ident
4856                                                -> map(|x| (Some(x), None))
4857                                                -> [0]#chain_ident;
4858                                            #watermark_ident
4859                                                -> map(|watermark| (None, Some(watermark)))
4860                                                -> [1]#chain_ident;
4861
4862                                            #fold_ident = #chain_ident
4863                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4864                                                    let __reduce_keyed_fn = #f_tokens;
4865                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4866                                                        if let Some((k, v)) = opt_payload {
4867                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4868                                                                if k < curr_watermark {
4869                                                                    return;
4870                                                                }
4871                                                            }
4872                                                            match map.entry(k) {
4873                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4874                                                                    e.insert(v);
4875                                                                }
4876                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4877                                                                    __reduce_keyed_fn(e.get_mut(), v);
4878                                                                }
4879                                                            }
4880                                                        } else {
4881                                                            let watermark = opt_watermark.unwrap();
4882                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4883                                                                if watermark <= curr_watermark {
4884                                                                    return;
4885                                                                }
4886                                                            }
4887                                                            map.retain(|k, _| *k >= watermark);
4888                                                            *opt_curr_watermark = Some(watermark);
4889                                                        }
4890                                                    }
4891                                                })
4892                                                -> flat_map(|(map, _curr_watermark)| map);
4893                                        },
4894                                        None,
4895                                        Some(&stmt_id.to_string()),
4896                                    );
4897                                }
4898                            }
4899                            BuildersOrCallback::Callback(_, node_callback) => {
4900                                node_callback(node, next_stmt_id);
4901                            }
4902                        }
4903
4904                        ident_stack.push(fold_ident);
4905                    }
4906
4907                    HydroNode::Network {
4908                        networking_info,
4909                        serialize_fn: serialize_pipeline,
4910                        instantiate_fn,
4911                        deserialize_fn: deserialize_pipeline,
4912                        input,
4913                        ..
4914                    } => {
4915                        let input_ident = ident_stack.pop().unwrap();
4916
4917                        let stmt_id = next_stmt_id.get_and_increment();
4918                        let receiver_stream_ident =
4919                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4920
4921                        match builders_or_callback {
4922                            BuildersOrCallback::Builders(graph_builders) => {
4923                                let (sink_expr, source_expr) = match instantiate_fn {
4924                                    DebugInstantiate::Building => (
4925                                        syn::parse_quote!(DUMMY_SINK),
4926                                        syn::parse_quote!(DUMMY_SOURCE),
4927                                    ),
4928
4929                                    DebugInstantiate::Finalized(finalized) => {
4930                                        (finalized.sink.clone(), finalized.source.clone())
4931                                    }
4932                                };
4933
4934                                graph_builders.create_network(
4935                                    &input.metadata().location_id,
4936                                    &out_location,
4937                                    input_ident,
4938                                    &receiver_stream_ident,
4939                                    serialize_pipeline.as_ref(),
4940                                    sink_expr,
4941                                    source_expr,
4942                                    deserialize_pipeline.as_ref(),
4943                                    stmt_id,
4944                                    networking_info,
4945                                );
4946                            }
4947                            BuildersOrCallback::Callback(_, node_callback) => {
4948                                node_callback(node, next_stmt_id);
4949                            }
4950                        }
4951
4952                        ident_stack.push(receiver_stream_ident);
4953                    }
4954
4955                    HydroNode::ExternalInput {
4956                        instantiate_fn,
4957                        deserialize_fn: deserialize_pipeline,
4958                        ..
4959                    } => {
4960                        let stmt_id = next_stmt_id.get_and_increment();
4961                        let receiver_stream_ident =
4962                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4963
4964                        match builders_or_callback {
4965                            BuildersOrCallback::Builders(graph_builders) => {
4966                                let (_, source_expr) = match instantiate_fn {
4967                                    DebugInstantiate::Building => (
4968                                        syn::parse_quote!(DUMMY_SINK),
4969                                        syn::parse_quote!(DUMMY_SOURCE),
4970                                    ),
4971
4972                                    DebugInstantiate::Finalized(finalized) => {
4973                                        (finalized.sink.clone(), finalized.source.clone())
4974                                    }
4975                                };
4976
4977                                graph_builders.create_external_source(
4978                                    &out_location,
4979                                    source_expr,
4980                                    &receiver_stream_ident,
4981                                    deserialize_pipeline.as_ref(),
4982                                    stmt_id,
4983                                );
4984                            }
4985                            BuildersOrCallback::Callback(_, node_callback) => {
4986                                node_callback(node, next_stmt_id);
4987                            }
4988                        }
4989
4990                        ident_stack.push(receiver_stream_ident);
4991                    }
4992
4993                    HydroNode::Counter {
4994                        tag,
4995                        duration,
4996                        prefix,
4997                        ..
4998                    } => {
4999                        let input_ident = ident_stack.pop().unwrap();
5000
5001                        let stmt_id = next_stmt_id.get_and_increment();
5002                        let counter_ident =
5003                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5004
5005                        match builders_or_callback {
5006                            BuildersOrCallback::Builders(graph_builders) => {
5007                                let arg = format!("{}({})", prefix, tag);
5008                                let builder = graph_builders.get_dfir_mut(&out_location);
5009                                builder.add_dfir(
5010                                    parse_quote! {
5011                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
5012                                    },
5013                                    None,
5014                                    Some(&stmt_id.to_string()),
5015                                );
5016                            }
5017                            BuildersOrCallback::Callback(_, node_callback) => {
5018                                node_callback(node, next_stmt_id);
5019                            }
5020                        }
5021
5022                        ident_stack.push(counter_ident);
5023                    }
5024                }
5025            },
5026            seen_tees,
5027            false,
5028        );
5029
5030        let ret = ident_stack
5031            .pop()
5032            .expect("ident_stack should have exactly one element after traversal");
5033        assert!(
5034            ident_stack.is_empty(),
5035            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5036             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5037            ident_stack.len()
5038        );
5039        ret
5040    }
5041
5042    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5043        match self {
5044            HydroNode::Placeholder => {
5045                panic!()
5046            }
5047            HydroNode::Cast { .. }
5048            | HydroNode::ObserveNonDet { .. }
5049            | HydroNode::UnboundSingleton { .. }
5050            | HydroNode::AssertIsConsistent { .. } => {}
5051            HydroNode::Source { source, .. } => match source {
5052                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5053                HydroSource::ExternalNetwork()
5054                | HydroSource::Spin()
5055                | HydroSource::ClusterMembers(_, _)
5056                | HydroSource::Embedded(_)
5057                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
5058            },
5059            HydroNode::SingletonSource { value, .. } => {
5060                transform(value);
5061            }
5062            HydroNode::CycleSource { .. }
5063            | HydroNode::Tee { .. }
5064            | HydroNode::Reference { .. }
5065            | HydroNode::YieldConcat { .. }
5066            | HydroNode::BeginAtomic { .. }
5067            | HydroNode::EndAtomic { .. }
5068            | HydroNode::Batch { .. }
5069            | HydroNode::Chain { .. }
5070            | HydroNode::MergeOrdered { .. }
5071            | HydroNode::ChainFirst { .. }
5072            | HydroNode::CrossProduct { .. }
5073            | HydroNode::CrossSingleton { .. }
5074            | HydroNode::ResolveFutures { .. }
5075            | HydroNode::ResolveFuturesBlocking { .. }
5076            | HydroNode::ResolveFuturesOrdered { .. }
5077            | HydroNode::Join { .. }
5078            | HydroNode::JoinHalf { .. }
5079            | HydroNode::Difference { .. }
5080            | HydroNode::AntiJoin { .. }
5081            | HydroNode::DeferTick { .. }
5082            | HydroNode::Enumerate { .. }
5083            | HydroNode::Unique { .. }
5084            | HydroNode::Sort { .. } => {}
5085            HydroNode::Map { f, .. }
5086            | HydroNode::FlatMap { f, .. }
5087            | HydroNode::FlatMapStreamBlocking { f, .. }
5088            | HydroNode::Filter { f, .. }
5089            | HydroNode::FilterMap { f, .. }
5090            | HydroNode::Inspect { f, .. }
5091            | HydroNode::Partition { f, .. }
5092            | HydroNode::Reduce { f, .. }
5093            | HydroNode::ReduceKeyed { f, .. }
5094            | HydroNode::ReduceKeyedWatermark { f, .. } => {
5095                transform(&mut f.expr);
5096            }
5097            HydroNode::Fold { init, acc, .. }
5098            | HydroNode::Scan { init, acc, .. }
5099            | HydroNode::ScanAsyncBlocking { init, acc, .. }
5100            | HydroNode::FoldKeyed { init, acc, .. } => {
5101                transform(&mut init.expr);
5102                transform(&mut acc.expr);
5103            }
5104            HydroNode::Network {
5105                serialize_fn,
5106                deserialize_fn,
5107                ..
5108            } => {
5109                if let Some(serialize_fn) = serialize_fn {
5110                    transform(serialize_fn);
5111                }
5112                if let Some(deserialize_fn) = deserialize_fn {
5113                    transform(deserialize_fn);
5114                }
5115            }
5116            HydroNode::ExternalInput { deserialize_fn, .. } => {
5117                if let Some(deserialize_fn) = deserialize_fn {
5118                    transform(deserialize_fn);
5119                }
5120            }
5121            HydroNode::Counter { duration, .. } => {
5122                transform(duration);
5123            }
5124        }
5125    }
5126
5127    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5128        &self.metadata().op
5129    }
5130
5131    pub fn metadata(&self) -> &HydroIrMetadata {
5132        match self {
5133            HydroNode::Placeholder => {
5134                panic!()
5135            }
5136            HydroNode::Cast { metadata, .. }
5137            | HydroNode::ObserveNonDet { metadata, .. }
5138            | HydroNode::AssertIsConsistent { metadata, .. }
5139            | HydroNode::UnboundSingleton { metadata, .. }
5140            | HydroNode::Source { metadata, .. }
5141            | HydroNode::SingletonSource { metadata, .. }
5142            | HydroNode::CycleSource { metadata, .. }
5143            | HydroNode::Tee { metadata, .. }
5144            | HydroNode::Reference { metadata, .. }
5145            | HydroNode::Partition { metadata, .. }
5146            | HydroNode::YieldConcat { metadata, .. }
5147            | HydroNode::BeginAtomic { metadata, .. }
5148            | HydroNode::EndAtomic { metadata, .. }
5149            | HydroNode::Batch { metadata, .. }
5150            | HydroNode::Chain { metadata, .. }
5151            | HydroNode::MergeOrdered { metadata, .. }
5152            | HydroNode::ChainFirst { metadata, .. }
5153            | HydroNode::CrossProduct { metadata, .. }
5154            | HydroNode::CrossSingleton { metadata, .. }
5155            | HydroNode::Join { metadata, .. }
5156            | HydroNode::JoinHalf { metadata, .. }
5157            | HydroNode::Difference { metadata, .. }
5158            | HydroNode::AntiJoin { metadata, .. }
5159            | HydroNode::ResolveFutures { metadata, .. }
5160            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5161            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5162            | HydroNode::Map { metadata, .. }
5163            | HydroNode::FlatMap { metadata, .. }
5164            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5165            | HydroNode::Filter { metadata, .. }
5166            | HydroNode::FilterMap { metadata, .. }
5167            | HydroNode::DeferTick { metadata, .. }
5168            | HydroNode::Enumerate { metadata, .. }
5169            | HydroNode::Inspect { metadata, .. }
5170            | HydroNode::Unique { metadata, .. }
5171            | HydroNode::Sort { metadata, .. }
5172            | HydroNode::Scan { metadata, .. }
5173            | HydroNode::ScanAsyncBlocking { metadata, .. }
5174            | HydroNode::Fold { metadata, .. }
5175            | HydroNode::FoldKeyed { metadata, .. }
5176            | HydroNode::Reduce { metadata, .. }
5177            | HydroNode::ReduceKeyed { metadata, .. }
5178            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5179            | HydroNode::ExternalInput { metadata, .. }
5180            | HydroNode::Network { metadata, .. }
5181            | HydroNode::Counter { metadata, .. } => metadata,
5182        }
5183    }
5184
5185    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5186        &mut self.metadata_mut().op
5187    }
5188
5189    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5190        match self {
5191            HydroNode::Placeholder => {
5192                panic!()
5193            }
5194            HydroNode::Cast { metadata, .. }
5195            | HydroNode::ObserveNonDet { metadata, .. }
5196            | HydroNode::AssertIsConsistent { metadata, .. }
5197            | HydroNode::UnboundSingleton { metadata, .. }
5198            | HydroNode::Source { metadata, .. }
5199            | HydroNode::SingletonSource { metadata, .. }
5200            | HydroNode::CycleSource { metadata, .. }
5201            | HydroNode::Tee { metadata, .. }
5202            | HydroNode::Reference { metadata, .. }
5203            | HydroNode::Partition { metadata, .. }
5204            | HydroNode::YieldConcat { metadata, .. }
5205            | HydroNode::BeginAtomic { metadata, .. }
5206            | HydroNode::EndAtomic { metadata, .. }
5207            | HydroNode::Batch { metadata, .. }
5208            | HydroNode::Chain { metadata, .. }
5209            | HydroNode::MergeOrdered { metadata, .. }
5210            | HydroNode::ChainFirst { metadata, .. }
5211            | HydroNode::CrossProduct { metadata, .. }
5212            | HydroNode::CrossSingleton { metadata, .. }
5213            | HydroNode::Join { metadata, .. }
5214            | HydroNode::JoinHalf { metadata, .. }
5215            | HydroNode::Difference { metadata, .. }
5216            | HydroNode::AntiJoin { metadata, .. }
5217            | HydroNode::ResolveFutures { metadata, .. }
5218            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5219            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5220            | HydroNode::Map { metadata, .. }
5221            | HydroNode::FlatMap { metadata, .. }
5222            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5223            | HydroNode::Filter { metadata, .. }
5224            | HydroNode::FilterMap { metadata, .. }
5225            | HydroNode::DeferTick { metadata, .. }
5226            | HydroNode::Enumerate { metadata, .. }
5227            | HydroNode::Inspect { metadata, .. }
5228            | HydroNode::Unique { metadata, .. }
5229            | HydroNode::Sort { metadata, .. }
5230            | HydroNode::Scan { metadata, .. }
5231            | HydroNode::ScanAsyncBlocking { metadata, .. }
5232            | HydroNode::Fold { metadata, .. }
5233            | HydroNode::FoldKeyed { metadata, .. }
5234            | HydroNode::Reduce { metadata, .. }
5235            | HydroNode::ReduceKeyed { metadata, .. }
5236            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5237            | HydroNode::ExternalInput { metadata, .. }
5238            | HydroNode::Network { metadata, .. }
5239            | HydroNode::Counter { metadata, .. } => metadata,
5240        }
5241    }
5242
5243    pub fn input(&self) -> Vec<&HydroNode> {
5244        match self {
5245            HydroNode::Placeholder => {
5246                panic!()
5247            }
5248            HydroNode::Source { .. }
5249            | HydroNode::SingletonSource { .. }
5250            | HydroNode::ExternalInput { .. }
5251            | HydroNode::CycleSource { .. }
5252            | HydroNode::Tee { .. }
5253            | HydroNode::Reference { .. }
5254            | HydroNode::Partition { .. } => {
5255                // Tee/Partition should find their input in separate special ways
5256                vec![]
5257            }
5258            HydroNode::Cast { inner, .. }
5259            | HydroNode::ObserveNonDet { inner, .. }
5260            | HydroNode::YieldConcat { inner, .. }
5261            | HydroNode::BeginAtomic { inner, .. }
5262            | HydroNode::EndAtomic { inner, .. }
5263            | HydroNode::Batch { inner, .. }
5264            | HydroNode::UnboundSingleton { inner, .. }
5265            | HydroNode::AssertIsConsistent { inner, .. } => {
5266                vec![inner]
5267            }
5268            HydroNode::Chain { first, second, .. } => {
5269                vec![first, second]
5270            }
5271            HydroNode::MergeOrdered { first, second, .. } => {
5272                vec![first, second]
5273            }
5274            HydroNode::ChainFirst { first, second, .. } => {
5275                vec![first, second]
5276            }
5277            HydroNode::CrossProduct { left, right, .. }
5278            | HydroNode::CrossSingleton { left, right, .. }
5279            | HydroNode::Join { left, right, .. }
5280            | HydroNode::JoinHalf { left, right, .. } => {
5281                vec![left, right]
5282            }
5283            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5284                vec![pos, neg]
5285            }
5286            HydroNode::Map { input, .. }
5287            | HydroNode::FlatMap { input, .. }
5288            | HydroNode::FlatMapStreamBlocking { input, .. }
5289            | HydroNode::Filter { input, .. }
5290            | HydroNode::FilterMap { input, .. }
5291            | HydroNode::Sort { input, .. }
5292            | HydroNode::DeferTick { input, .. }
5293            | HydroNode::Enumerate { input, .. }
5294            | HydroNode::Inspect { input, .. }
5295            | HydroNode::Unique { input, .. }
5296            | HydroNode::Network { input, .. }
5297            | HydroNode::Counter { input, .. }
5298            | HydroNode::ResolveFutures { input, .. }
5299            | HydroNode::ResolveFuturesBlocking { input, .. }
5300            | HydroNode::ResolveFuturesOrdered { input, .. }
5301            | HydroNode::Fold { input, .. }
5302            | HydroNode::FoldKeyed { input, .. }
5303            | HydroNode::Reduce { input, .. }
5304            | HydroNode::ReduceKeyed { input, .. }
5305            | HydroNode::Scan { input, .. }
5306            | HydroNode::ScanAsyncBlocking { input, .. } => {
5307                vec![input]
5308            }
5309            HydroNode::ReduceKeyedWatermark {
5310                input, watermark, ..
5311            } => {
5312                vec![input, watermark]
5313            }
5314        }
5315    }
5316
5317    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5318        self.input()
5319            .iter()
5320            .map(|input_node| input_node.metadata())
5321            .collect()
5322    }
5323
5324    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5325    /// has other live references, meaning the upstream is already driven
5326    /// by another consumer and does not need a Null sink.
5327    pub fn is_shared_with_others(&self) -> bool {
5328        match self {
5329            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5330                Rc::strong_count(&inner.0) > 1
5331            }
5332            // A zero-output reference node is valid in DFIR (it drains itself at
5333            // end of tick), so it doesn't need to be driven by another consumer.
5334            HydroNode::Reference { .. } => false,
5335            _ => false,
5336        }
5337    }
5338
5339    pub fn print_root(&self) -> String {
5340        match self {
5341            HydroNode::Placeholder => {
5342                panic!()
5343            }
5344            HydroNode::Cast { .. } => "Cast()".to_owned(),
5345            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5346            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5347            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5348            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5349            HydroNode::SingletonSource {
5350                value,
5351                first_tick_only,
5352                ..
5353            } => format!(
5354                "SingletonSource({:?}, first_tick_only={})",
5355                value, first_tick_only
5356            ),
5357            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5358            HydroNode::Tee { inner, .. } => {
5359                format!("Tee({})", inner.0.borrow().print_root())
5360            }
5361            HydroNode::Reference { inner, kind, .. } => {
5362                format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5363            }
5364            HydroNode::Partition { f, is_true, .. } => {
5365                format!("Partition({:?}, is_true={})", f, is_true)
5366            }
5367            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5368            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5369            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5370            HydroNode::Batch { .. } => "Batch()".to_owned(),
5371            HydroNode::Chain { first, second, .. } => {
5372                format!("Chain({}, {})", first.print_root(), second.print_root())
5373            }
5374            HydroNode::MergeOrdered { first, second, .. } => {
5375                format!(
5376                    "MergeOrdered({}, {})",
5377                    first.print_root(),
5378                    second.print_root()
5379                )
5380            }
5381            HydroNode::ChainFirst { first, second, .. } => {
5382                format!(
5383                    "ChainFirst({}, {})",
5384                    first.print_root(),
5385                    second.print_root()
5386                )
5387            }
5388            HydroNode::CrossProduct { left, right, .. } => {
5389                format!(
5390                    "CrossProduct({}, {})",
5391                    left.print_root(),
5392                    right.print_root()
5393                )
5394            }
5395            HydroNode::CrossSingleton { left, right, .. } => {
5396                format!(
5397                    "CrossSingleton({}, {})",
5398                    left.print_root(),
5399                    right.print_root()
5400                )
5401            }
5402            HydroNode::Join { left, right, .. } => {
5403                format!("Join({}, {})", left.print_root(), right.print_root())
5404            }
5405            HydroNode::JoinHalf { left, right, .. } => {
5406                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5407            }
5408            HydroNode::Difference { pos, neg, .. } => {
5409                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5410            }
5411            HydroNode::AntiJoin { pos, neg, .. } => {
5412                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5413            }
5414            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5415            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5416            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5417            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5418            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5419            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5420            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5421            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5422            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5423            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5424            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5425            HydroNode::Unique { .. } => "Unique()".to_owned(),
5426            HydroNode::Sort { .. } => "Sort()".to_owned(),
5427            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5428            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5429            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5430                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5431            }
5432            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5433            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5434            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5435            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5436            HydroNode::Network { .. } => "Network()".to_owned(),
5437            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5438            HydroNode::Counter { tag, duration, .. } => {
5439                format!("Counter({:?}, {:?})", tag, duration)
5440            }
5441        }
5442    }
5443}
5444
5445#[cfg(feature = "build")]
5446fn instantiate_network<'a, D>(
5447    env: &mut D::InstantiateEnv,
5448    from_location: &LocationId,
5449    to_location: &LocationId,
5450    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5451    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5452    name: Option<&str>,
5453    networking_info: &crate::networking::NetworkingInfo,
5454) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5455where
5456    D: Deploy<'a>,
5457{
5458    let ((sink, source), connect_fn) = match (from_location, to_location) {
5459        (&LocationId::Process(from), &LocationId::Process(to)) => {
5460            let from_node = processes
5461                .get(from)
5462                .unwrap_or_else(|| {
5463                    panic!("A process used in the graph was not instantiated: {}", from)
5464                })
5465                .clone();
5466            let to_node = processes
5467                .get(to)
5468                .unwrap_or_else(|| {
5469                    panic!("A process used in the graph was not instantiated: {}", to)
5470                })
5471                .clone();
5472
5473            let sink_port = from_node.next_port();
5474            let source_port = to_node.next_port();
5475
5476            (
5477                D::o2o_sink_source(
5478                    env,
5479                    &from_node,
5480                    &sink_port,
5481                    &to_node,
5482                    &source_port,
5483                    name,
5484                    networking_info,
5485                ),
5486                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5487            )
5488        }
5489        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5490            let from_node = processes
5491                .get(from)
5492                .unwrap_or_else(|| {
5493                    panic!("A process used in the graph was not instantiated: {}", from)
5494                })
5495                .clone();
5496            let to_node = clusters
5497                .get(to)
5498                .unwrap_or_else(|| {
5499                    panic!("A cluster used in the graph was not instantiated: {}", to)
5500                })
5501                .clone();
5502
5503            let sink_port = from_node.next_port();
5504            let source_port = to_node.next_port();
5505
5506            (
5507                D::o2m_sink_source(
5508                    env,
5509                    &from_node,
5510                    &sink_port,
5511                    &to_node,
5512                    &source_port,
5513                    name,
5514                    networking_info,
5515                ),
5516                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5517            )
5518        }
5519        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5520            let from_node = clusters
5521                .get(from)
5522                .unwrap_or_else(|| {
5523                    panic!("A cluster used in the graph was not instantiated: {}", from)
5524                })
5525                .clone();
5526            let to_node = processes
5527                .get(to)
5528                .unwrap_or_else(|| {
5529                    panic!("A process used in the graph was not instantiated: {}", to)
5530                })
5531                .clone();
5532
5533            let sink_port = from_node.next_port();
5534            let source_port = to_node.next_port();
5535
5536            (
5537                D::m2o_sink_source(
5538                    env,
5539                    &from_node,
5540                    &sink_port,
5541                    &to_node,
5542                    &source_port,
5543                    name,
5544                    networking_info,
5545                ),
5546                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5547            )
5548        }
5549        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5550            let from_node = clusters
5551                .get(from)
5552                .unwrap_or_else(|| {
5553                    panic!("A cluster used in the graph was not instantiated: {}", from)
5554                })
5555                .clone();
5556            let to_node = clusters
5557                .get(to)
5558                .unwrap_or_else(|| {
5559                    panic!("A cluster used in the graph was not instantiated: {}", to)
5560                })
5561                .clone();
5562
5563            let sink_port = from_node.next_port();
5564            let source_port = to_node.next_port();
5565
5566            (
5567                D::m2m_sink_source(
5568                    env,
5569                    &from_node,
5570                    &sink_port,
5571                    &to_node,
5572                    &source_port,
5573                    name,
5574                    networking_info,
5575                ),
5576                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5577            )
5578        }
5579        (LocationId::Tick(_, _), _) => panic!(),
5580        (_, LocationId::Tick(_, _)) => panic!(),
5581        (LocationId::Atomic(_), _) => panic!(),
5582        (_, LocationId::Atomic(_)) => panic!(),
5583    };
5584    (sink, source, connect_fn)
5585}
5586
5587#[cfg(test)]
5588mod serde_test;
5589
5590#[cfg(test)]
5591mod test {
5592    use std::mem::size_of;
5593
5594    use stageleft::{QuotedWithContext, q};
5595
5596    use super::*;
5597
5598    #[test]
5599    #[cfg_attr(
5600        not(feature = "build"),
5601        ignore = "expects inclusion of feature-gated fields"
5602    )]
5603    fn hydro_node_size() {
5604        assert_eq!(size_of::<HydroNode>(), 264);
5605    }
5606
5607    #[test]
5608    #[cfg_attr(
5609        not(feature = "build"),
5610        ignore = "expects inclusion of feature-gated fields"
5611    )]
5612    fn hydro_root_size() {
5613        assert_eq!(size_of::<HydroRoot>(), 136);
5614    }
5615
5616    #[test]
5617    fn test_simplify_q_macro_basic() {
5618        // Test basic non-q! expression
5619        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5620        let result = simplify_q_macro(simple_expr.clone());
5621        assert_eq!(result, simple_expr);
5622    }
5623
5624    #[test]
5625    fn test_simplify_q_macro_actual_stageleft_call() {
5626        // Test a simplified version of what a real stageleft call might look like
5627        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5628        let result = simplify_q_macro(stageleft_call);
5629        // This should be processed by our visitor and simplified to q!(...)
5630        // since we detect the stageleft::runtime_support::fn_* pattern
5631        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5632    }
5633
5634    #[test]
5635    fn test_closure_no_pipe_at_start() {
5636        // Test a closure that does not start with a pipe
5637        let stageleft_call = q!({
5638            let foo = 123;
5639            move |b: usize| b + foo
5640        })
5641        .splice_fn1_ctx(&());
5642        let result = simplify_q_macro(stageleft_call);
5643        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5644    }
5645}