1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub(crate) expr: DebugExpr,
46 pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53 fn clone(&self) -> Self {
54 Self {
55 expr: self.expr.clone(),
56 singleton_refs: self
57 .singleton_refs
58 .iter()
59 .map(|(node, is_mut)| {
60 let HydroNode::Reference {
61 inner,
62 kind,
63 access_counter,
64 metadata,
65 } = node
66 else {
67 panic!("singleton_refs should only contain HydroNode::Reference");
68 };
69 (
70 HydroNode::Reference {
71 inner: SharedNode(Rc::clone(&inner.0)),
72 kind: *kind,
73 access_counter: access_counter.freeze(),
74 metadata: metadata.clone(),
75 },
76 *is_mut,
77 )
78 })
79 .collect(),
80 }
81 }
82}
83
84impl Hash for ClosureExpr {
85 fn hash<H: Hasher>(&self, state: &mut H) {
86 self.expr.hash(state);
87 }
91}
92
93impl serde::Serialize for ClosureExpr {
94 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95 use serde::ser::SerializeStruct;
96 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97 s.serialize_field("expr", &self.expr)?;
98 s.serialize_field(
99 "singleton_refs",
100 &SerializableSingletonRefs(&self.singleton_refs),
101 )?;
102 s.end()
103 }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110 use serde::ser::SerializeSeq;
111 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112 for (node, is_mut) in self.0.iter() {
113 seq.serialize_element(&(node, is_mut))?;
114 }
115 seq.end()
116 }
117}
118
119impl Debug for ClosureExpr {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 Debug::fmt(&self.expr, f)
122 }
123}
124
125impl Display for ClosureExpr {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 Display::fmt(&self.expr, f)
128 }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132 fn from(expr: syn::Expr) -> Self {
133 Self {
134 expr: DebugExpr(Box::new(expr)),
135 singleton_refs: Vec::new(),
136 }
137 }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141 fn from(expr: DebugExpr) -> Self {
142 Self {
143 expr,
144 singleton_refs: Vec::new(),
145 }
146 }
147}
148
149impl ClosureExpr {
150 pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151 Self {
152 expr,
153 singleton_refs,
154 }
155 }
156
157 pub fn has_mut_ref(&self) -> bool {
158 self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159 }
160
161 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162 Self {
163 expr: self.expr.clone(),
164 singleton_refs: self
165 .singleton_refs
166 .iter()
167 .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168 .collect(),
169 }
170 }
171
172 pub fn transform_children(
173 &mut self,
174 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175 seen_tees: &mut SeenSharedNodes,
176 ) {
177 for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178 transform(ref_node, seen_tees);
179 }
180 }
181
182 #[cfg(feature = "build")]
185 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186 if self.singleton_refs.is_empty() {
187 self.expr.0.to_token_stream()
188 } else {
189 assert!(
190 ident_stack.len() >= self.singleton_refs.len(),
191 "ident_stack has {} entries but expected at least {} for singleton_refs",
192 ident_stack.len(),
193 self.singleton_refs.len()
194 );
195 let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197 let mut let_bindings = Vec::new();
198 for ((i, (ref_node, is_mut)), ref_ident) in
199 self.singleton_refs.iter().enumerate().zip(ref_idents)
200 {
201 let HydroNode::Reference { access_counter, .. } = ref_node else {
202 panic!("ClosureExpression expected references to `HydroNode::Reference`");
203 };
204 let group = access_counter.frozen_group();
205 let local_ident = handoff_ref_ident(i);
207 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208 let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209 let mut_token = is_mut.then(|| quote!(mut));
210 let binding = quote! {
211 let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212 };
213 let_bindings.push(binding);
214 }
215
216 let expr = &self.expr.0;
217 quote! {
218 {
219 #( #let_bindings )*
220 #expr
221 }
222 }
223 }
224 }
225}
226
227#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235 serializer.serialize_str(&self.to_string())
236 }
237}
238
239impl From<syn::Expr> for DebugExpr {
240 fn from(expr: syn::Expr) -> Self {
241 Self(Box::new(expr))
242 }
243}
244
245impl Deref for DebugExpr {
246 type Target = syn::Expr;
247
248 fn deref(&self) -> &Self::Target {
249 &self.0
250 }
251}
252
253impl ToTokens for DebugExpr {
254 fn to_tokens(&self, tokens: &mut TokenStream) {
255 self.0.to_tokens(tokens);
256 }
257}
258
259impl Debug for DebugExpr {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 write!(f, "{}", self.0.to_token_stream())
262 }
263}
264
265impl Display for DebugExpr {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 let original = self.0.as_ref().clone();
268 let simplified = simplify_q_macro(original);
269
270 write!(f, "q!({})", quote::quote!(#simplified))
273 }
274}
275
276fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278 if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279 && is_stageleft_runtime_support_call(&path_expr.path)
281 && let syn::Expr::Block(b) = &call.args[0]
282 && b.block.stmts.len() == 3
283 && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284 {
286 let mut e = e.clone();
287 while let syn::Expr::Block(ref mut block) = e
288 && block.block.stmts.len() == 1
289 && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290 {
291 e = inner_e;
292 }
293
294 e
295 } else {
296 expr
297 }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301 if let Some(last_segment) = path.segments.last() {
303 let fn_name = last_segment.ident.to_string();
304 path.segments.len() > 2
305 && path.segments[0].ident == "stageleft"
306 && path.segments[1].ident == "runtime_support"
307 && fn_name.contains("_type_hint")
308 } else {
309 false
310 }
311}
312
313#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320 fn from(t: syn::Type) -> Self {
321 Self(Box::new(t))
322 }
323}
324
325impl Deref for DebugType {
326 type Target = syn::Type;
327
328 fn deref(&self) -> &Self::Target {
329 &self.0
330 }
331}
332
333impl ToTokens for DebugType {
334 fn to_tokens(&self, tokens: &mut TokenStream) {
335 self.0.to_tokens(tokens);
336 }
337}
338
339impl Debug for DebugType {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 write!(f, "{}", self.0.to_token_stream())
342 }
343}
344
345impl serde::Serialize for DebugType {
346 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348 }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352 backtrace: &Backtrace,
353 serializer: S,
354) -> Result<S::Ok, S::Error> {
355 match backtrace.format_span() {
356 Some(span) => serializer.serialize_some(&span),
357 None => serializer.serialize_none(),
358 }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362 ident: &syn::Ident,
363 serializer: S,
364) -> Result<S::Ok, S::Error> {
365 serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369 Building,
370 Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375 match self {
376 DebugInstantiate::Building => {
377 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378 }
379 DebugInstantiate::Finalized(_) => {
380 panic!(
381 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382 )
383 }
384 }
385 }
386}
387
388#[cfg_attr(
389 not(feature = "build"),
390 expect(
391 dead_code,
392 reason = "sink, source unused without `feature = \"build\"`."
393 )
394)]
395pub struct DebugInstantiateFinalized {
396 sink: syn::Expr,
397 source: syn::Expr,
398 connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402 fn from(f: DebugInstantiateFinalized) -> Self {
403 Self::Finalized(Box::new(f))
404 }
405}
406
407impl Debug for DebugInstantiate {
408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409 write!(f, "<network instantiate>")
410 }
411}
412
413impl Hash for DebugInstantiate {
414 fn hash<H: Hasher>(&self, _state: &mut H) {
415 }
417}
418
419impl Clone for DebugInstantiate {
420 fn clone(&self) -> Self {
421 match self {
422 DebugInstantiate::Building => DebugInstantiate::Building,
423 DebugInstantiate::Finalized(_) => {
424 panic!("DebugInstantiate::Finalized should not be cloned")
425 }
426 }
427 }
428}
429
430#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440 Uninit,
442 Stream(DebugExpr),
445 Tee(LocationId, LocationId),
449}
450
451#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454 Stream(DebugExpr),
455 ExternalNetwork(),
456 Iter(DebugExpr),
457 Spin(),
458 ClusterMembers(LocationId, ClusterMembersState),
459 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464pub trait DfirBuilder {
470 fn singleton_intermediates(&self) -> bool;
472
473 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476 #[expect(clippy::too_many_arguments, reason = "TODO")]
477 fn batch(
478 &mut self,
479 in_ident: syn::Ident,
480 in_location: &LocationId,
481 in_kind: &CollectionKind,
482 out_ident: &syn::Ident,
483 out_location: &LocationId,
484 op_meta: &HydroIrOpMetadata,
485 fold_hooked_idents: &HashSet<String>,
486 );
487 fn yield_from_tick(
488 &mut self,
489 in_ident: syn::Ident,
490 in_location: &LocationId,
491 in_kind: &CollectionKind,
492 out_ident: &syn::Ident,
493 out_location: &LocationId,
494 );
495
496 fn begin_atomic(
497 &mut self,
498 in_ident: syn::Ident,
499 in_location: &LocationId,
500 in_kind: &CollectionKind,
501 out_ident: &syn::Ident,
502 out_location: &LocationId,
503 op_meta: &HydroIrOpMetadata,
504 );
505 fn end_atomic(
506 &mut self,
507 in_ident: syn::Ident,
508 in_location: &LocationId,
509 in_kind: &CollectionKind,
510 out_ident: &syn::Ident,
511 );
512
513 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514 fn observe_nondet(
515 &mut self,
516 trusted: bool,
517 location: &LocationId,
518 in_ident: syn::Ident,
519 in_kind: &CollectionKind,
520 out_ident: &syn::Ident,
521 out_kind: &CollectionKind,
522 op_meta: &HydroIrOpMetadata,
523 );
524
525 #[expect(clippy::too_many_arguments, reason = "TODO")]
526 fn merge_ordered(
527 &mut self,
528 location: &LocationId,
529 first_ident: syn::Ident,
530 second_ident: syn::Ident,
531 out_ident: &syn::Ident,
532 in_kind: &CollectionKind,
533 op_meta: &HydroIrOpMetadata,
534 operator_tag: Option<&str>,
535 );
536
537 #[expect(clippy::too_many_arguments, reason = "TODO")]
538 fn create_network(
539 &mut self,
540 from: &LocationId,
541 to: &LocationId,
542 input_ident: syn::Ident,
543 out_ident: &syn::Ident,
544 serialize: Option<&DebugExpr>,
545 sink: syn::Expr,
546 source: syn::Expr,
547 deserialize: Option<&DebugExpr>,
548 tag_id: StmtId,
549 networking_info: &crate::networking::NetworkingInfo,
550 );
551
552 fn create_external_source(
553 &mut self,
554 on: &LocationId,
555 source_expr: syn::Expr,
556 out_ident: &syn::Ident,
557 deserialize: Option<&DebugExpr>,
558 tag_id: StmtId,
559 );
560
561 fn create_external_output(
562 &mut self,
563 on: &LocationId,
564 sink_expr: syn::Expr,
565 input_ident: &syn::Ident,
566 serialize: Option<&DebugExpr>,
567 tag_id: StmtId,
568 );
569
570 fn emit_fold_hook(
573 &mut self,
574 location: &LocationId,
575 in_ident: &syn::Ident,
576 in_kind: &CollectionKind,
577 op_meta: &HydroIrOpMetadata,
578 ) -> Option<syn::Ident>;
579
580 fn assert_is_consistent(
584 &mut self,
585 trusted: bool,
586 location: &LocationId,
587 in_ident: syn::Ident,
588 out_ident: &syn::Ident,
589 );
590
591 fn observe_for_mut(
595 &mut self,
596 location: &LocationId,
597 in_ident: syn::Ident,
598 in_kind: &CollectionKind,
599 out_ident: &syn::Ident,
600 op_meta: &HydroIrOpMetadata,
601 );
602}
603
604#[cfg(feature = "build")]
605impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
606 fn singleton_intermediates(&self) -> bool {
607 false
608 }
609
610 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
611 self.entry(location.root().key())
612 .expect("location was removed")
613 .or_default()
614 }
615
616 fn batch(
617 &mut self,
618 in_ident: syn::Ident,
619 in_location: &LocationId,
620 in_kind: &CollectionKind,
621 out_ident: &syn::Ident,
622 _out_location: &LocationId,
623 _op_meta: &HydroIrOpMetadata,
624 _fold_hooked_idents: &HashSet<String>,
625 ) {
626 let builder = self.get_dfir_mut(in_location.root());
627 if in_kind.is_bounded()
628 && matches!(
629 in_kind,
630 CollectionKind::Singleton { .. }
631 | CollectionKind::Optional { .. }
632 | CollectionKind::KeyedSingleton { .. }
633 )
634 {
635 assert!(in_location.is_top_level());
636 builder.add_dfir(
637 parse_quote! {
638 #out_ident = #in_ident -> persist::<'static>();
639 },
640 None,
641 None,
642 );
643 } else {
644 builder.add_dfir(
645 parse_quote! {
646 #out_ident = #in_ident;
647 },
648 None,
649 None,
650 );
651 }
652 }
653
654 fn yield_from_tick(
655 &mut self,
656 in_ident: syn::Ident,
657 in_location: &LocationId,
658 _in_kind: &CollectionKind,
659 out_ident: &syn::Ident,
660 _out_location: &LocationId,
661 ) {
662 let builder = self.get_dfir_mut(in_location.root());
663 builder.add_dfir(
664 parse_quote! {
665 #out_ident = #in_ident;
666 },
667 None,
668 None,
669 );
670 }
671
672 fn begin_atomic(
673 &mut self,
674 in_ident: syn::Ident,
675 in_location: &LocationId,
676 _in_kind: &CollectionKind,
677 out_ident: &syn::Ident,
678 _out_location: &LocationId,
679 _op_meta: &HydroIrOpMetadata,
680 ) {
681 let builder = self.get_dfir_mut(in_location.root());
682 builder.add_dfir(
683 parse_quote! {
684 #out_ident = #in_ident;
685 },
686 None,
687 None,
688 );
689 }
690
691 fn end_atomic(
692 &mut self,
693 in_ident: syn::Ident,
694 in_location: &LocationId,
695 _in_kind: &CollectionKind,
696 out_ident: &syn::Ident,
697 ) {
698 let builder = self.get_dfir_mut(in_location.root());
699 builder.add_dfir(
700 parse_quote! {
701 #out_ident = #in_ident;
702 },
703 None,
704 None,
705 );
706 }
707
708 fn observe_nondet(
709 &mut self,
710 _trusted: bool,
711 location: &LocationId,
712 in_ident: syn::Ident,
713 _in_kind: &CollectionKind,
714 out_ident: &syn::Ident,
715 _out_kind: &CollectionKind,
716 _op_meta: &HydroIrOpMetadata,
717 ) {
718 let builder = self.get_dfir_mut(location);
719 builder.add_dfir(
720 parse_quote! {
721 #out_ident = #in_ident;
722 },
723 None,
724 None,
725 );
726 }
727
728 fn merge_ordered(
729 &mut self,
730 location: &LocationId,
731 first_ident: syn::Ident,
732 second_ident: syn::Ident,
733 out_ident: &syn::Ident,
734 _in_kind: &CollectionKind,
735 _op_meta: &HydroIrOpMetadata,
736 operator_tag: Option<&str>,
737 ) {
738 let builder = self.get_dfir_mut(location);
739 builder.add_dfir(
740 parse_quote! {
741 #out_ident = union();
742 #first_ident -> [0]#out_ident;
743 #second_ident -> [1]#out_ident;
744 },
745 None,
746 operator_tag,
747 );
748 }
749
750 fn create_network(
751 &mut self,
752 from: &LocationId,
753 to: &LocationId,
754 input_ident: syn::Ident,
755 out_ident: &syn::Ident,
756 serialize: Option<&DebugExpr>,
757 sink: syn::Expr,
758 source: syn::Expr,
759 deserialize: Option<&DebugExpr>,
760 tag_id: StmtId,
761 _networking_info: &crate::networking::NetworkingInfo,
762 ) {
763 let sender_builder = self.get_dfir_mut(from);
764 if let Some(serialize_pipeline) = serialize {
765 sender_builder.add_dfir(
766 parse_quote! {
767 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
768 },
769 None,
770 Some(&format!("send{}", tag_id)),
772 );
773 } else {
774 sender_builder.add_dfir(
775 parse_quote! {
776 #input_ident -> dest_sink(#sink);
777 },
778 None,
779 Some(&format!("send{}", tag_id)),
780 );
781 }
782
783 let receiver_builder = self.get_dfir_mut(to);
784 if let Some(deserialize_pipeline) = deserialize {
785 receiver_builder.add_dfir(
786 parse_quote! {
787 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
788 },
789 None,
790 Some(&format!("recv{}", tag_id)),
791 );
792 } else {
793 receiver_builder.add_dfir(
794 parse_quote! {
795 #out_ident = source_stream(#source);
796 },
797 None,
798 Some(&format!("recv{}", tag_id)),
799 );
800 }
801 }
802
803 fn create_external_source(
804 &mut self,
805 on: &LocationId,
806 source_expr: syn::Expr,
807 out_ident: &syn::Ident,
808 deserialize: Option<&DebugExpr>,
809 tag_id: StmtId,
810 ) {
811 let receiver_builder = self.get_dfir_mut(on);
812 if let Some(deserialize_pipeline) = deserialize {
813 receiver_builder.add_dfir(
814 parse_quote! {
815 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
816 },
817 None,
818 Some(&format!("recv{}", tag_id)),
819 );
820 } else {
821 receiver_builder.add_dfir(
822 parse_quote! {
823 #out_ident = source_stream(#source_expr);
824 },
825 None,
826 Some(&format!("recv{}", tag_id)),
827 );
828 }
829 }
830
831 fn create_external_output(
832 &mut self,
833 on: &LocationId,
834 sink_expr: syn::Expr,
835 input_ident: &syn::Ident,
836 serialize: Option<&DebugExpr>,
837 tag_id: StmtId,
838 ) {
839 let sender_builder = self.get_dfir_mut(on);
840 if let Some(serialize_fn) = serialize {
841 sender_builder.add_dfir(
842 parse_quote! {
843 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
844 },
845 None,
846 Some(&format!("send{}", tag_id)),
848 );
849 } else {
850 sender_builder.add_dfir(
851 parse_quote! {
852 #input_ident -> dest_sink(#sink_expr);
853 },
854 None,
855 Some(&format!("send{}", tag_id)),
856 );
857 }
858 }
859
860 fn emit_fold_hook(
861 &mut self,
862 _location: &LocationId,
863 _in_ident: &syn::Ident,
864 _in_kind: &CollectionKind,
865 _op_meta: &HydroIrOpMetadata,
866 ) -> Option<syn::Ident> {
867 None
868 }
869
870 fn assert_is_consistent(
871 &mut self,
872 _trusted: bool,
873 location: &LocationId,
874 in_ident: syn::Ident,
875 out_ident: &syn::Ident,
876 ) {
877 let builder = self.get_dfir_mut(location);
878 builder.add_dfir(
879 parse_quote! {
880 #out_ident = #in_ident;
881 },
882 None,
883 None,
884 );
885 }
886
887 fn observe_for_mut(
888 &mut self,
889 location: &LocationId,
890 in_ident: syn::Ident,
891 _in_kind: &CollectionKind,
892 out_ident: &syn::Ident,
893 _op_meta: &HydroIrOpMetadata,
894 ) {
895 let builder = self.get_dfir_mut(location);
896 builder.add_dfir(
897 parse_quote! {
898 #out_ident = #in_ident;
899 },
900 None,
901 None,
902 );
903 }
904}
905
906#[cfg(feature = "build")]
907pub enum BuildersOrCallback<'a, L, N>
908where
909 L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
910 N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
911{
912 Builders(&'a mut dyn DfirBuilder),
913 Callback(L, N),
914}
915
916#[derive(Debug, Hash, serde::Serialize)]
920pub enum HydroRoot {
921 ForEach {
922 f: ClosureExpr,
923 input: Box<HydroNode>,
924 op_metadata: HydroIrOpMetadata,
925 },
926 SendExternal {
927 to_external_key: LocationKey,
928 to_port_id: ExternalPortId,
929 to_many: bool,
930 unpaired: bool,
931 serialize_fn: Option<DebugExpr>,
932 instantiate_fn: DebugInstantiate,
933 input: Box<HydroNode>,
934 op_metadata: HydroIrOpMetadata,
935 },
936 DestSink {
937 sink: DebugExpr,
938 input: Box<HydroNode>,
939 op_metadata: HydroIrOpMetadata,
940 },
941 CycleSink {
942 cycle_id: CycleId,
943 input: Box<HydroNode>,
944 op_metadata: HydroIrOpMetadata,
945 },
946 EmbeddedOutput {
947 #[serde(serialize_with = "serialize_ident")]
948 ident: syn::Ident,
949 input: Box<HydroNode>,
950 op_metadata: HydroIrOpMetadata,
951 },
952 Null {
953 input: Box<HydroNode>,
954 op_metadata: HydroIrOpMetadata,
955 },
956}
957
958impl HydroRoot {
959 #[cfg(feature = "build")]
960 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
961 pub fn compile_network<'a, D>(
962 &mut self,
963 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
964 seen_tees: &mut SeenSharedNodes,
965 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
966 processes: &SparseSecondaryMap<LocationKey, D::Process>,
967 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
968 externals: &SparseSecondaryMap<LocationKey, D::External>,
969 env: &mut D::InstantiateEnv,
970 ) where
971 D: Deploy<'a>,
972 {
973 let refcell_extra_stmts = RefCell::new(extra_stmts);
974 let refcell_env = RefCell::new(env);
975 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
976 self.transform_bottom_up(
977 &mut |l| {
978 if let HydroRoot::SendExternal {
979 input,
980 to_external_key,
981 to_port_id,
982 to_many,
983 unpaired,
984 instantiate_fn,
985 ..
986 } = l
987 {
988 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
989 DebugInstantiate::Building => {
990 let to_node = externals
991 .get(*to_external_key)
992 .unwrap_or_else(|| {
993 panic!("A external used in the graph was not instantiated: {}", to_external_key)
994 })
995 .clone();
996
997 match input.metadata().location_id.root() {
998 &LocationId::Process(process_key) => {
999 if *to_many {
1000 (
1001 (
1002 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1003 parse_quote!(DUMMY),
1004 ),
1005 Box::new(|| {}) as Box<dyn FnOnce()>,
1006 )
1007 } else {
1008 let from_node = processes
1009 .get(process_key)
1010 .unwrap_or_else(|| {
1011 panic!("A process used in the graph was not instantiated: {}", process_key)
1012 })
1013 .clone();
1014
1015 let sink_port = from_node.next_port();
1016 let source_port = to_node.next_port();
1017
1018 if *unpaired {
1019 use stageleft::quote_type;
1020 use tokio_util::codec::LengthDelimitedCodec;
1021
1022 to_node.register(*to_port_id, source_port.clone());
1023
1024 let _ = D::e2o_source(
1025 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1026 &to_node, &source_port,
1027 &from_node, &sink_port,
1028 "e_type::<LengthDelimitedCodec>(),
1029 format!("{}_{}", *to_external_key, *to_port_id)
1030 );
1031 }
1032
1033 (
1034 (
1035 D::o2e_sink(
1036 &from_node,
1037 &sink_port,
1038 &to_node,
1039 &source_port,
1040 format!("{}_{}", *to_external_key, *to_port_id)
1041 ),
1042 parse_quote!(DUMMY),
1043 ),
1044 if *unpaired {
1045 D::e2o_connect(
1046 &to_node,
1047 &source_port,
1048 &from_node,
1049 &sink_port,
1050 *to_many,
1051 NetworkHint::Auto,
1052 )
1053 } else {
1054 Box::new(|| {}) as Box<dyn FnOnce()>
1055 },
1056 )
1057 }
1058 }
1059 LocationId::Cluster(cluster_key) => {
1060 let from_node = clusters
1061 .get(*cluster_key)
1062 .unwrap_or_else(|| {
1063 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1064 })
1065 .clone();
1066
1067 let sink_port = from_node.next_port();
1068 let source_port = to_node.next_port();
1069
1070 if *unpaired {
1071 to_node.register(*to_port_id, source_port.clone());
1072 }
1073
1074 (
1075 (
1076 D::m2e_sink(
1077 &from_node,
1078 &sink_port,
1079 &to_node,
1080 &source_port,
1081 format!("{}_{}", *to_external_key, *to_port_id)
1082 ),
1083 parse_quote!(DUMMY),
1084 ),
1085 Box::new(|| {}) as Box<dyn FnOnce()>,
1086 )
1087 }
1088 _ => panic!()
1089 }
1090 },
1091
1092 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1093 };
1094
1095 *instantiate_fn = DebugInstantiateFinalized {
1096 sink: sink_expr,
1097 source: source_expr,
1098 connect_fn: Some(connect_fn),
1099 }
1100 .into();
1101 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1102 let element_type = match &input.metadata().collection_kind {
1103 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1104 _ => panic!("Embedded output must have Stream collection kind"),
1105 };
1106 let location_key = match input.metadata().location_id.root() {
1107 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1108 _ => panic!("Embedded output must be on a process or cluster"),
1109 };
1110 D::register_embedded_output(
1111 &mut refcell_env.borrow_mut(),
1112 location_key,
1113 ident,
1114 &element_type,
1115 );
1116 }
1117 },
1118 &mut |n| {
1119 if let HydroNode::Network {
1120 name,
1121 networking_info,
1122 input,
1123 instantiate_fn,
1124 metadata,
1125 ..
1126 } = n
1127 {
1128 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1129 DebugInstantiate::Building => instantiate_network::<D>(
1130 &mut refcell_env.borrow_mut(),
1131 input.metadata().location_id.root(),
1132 metadata.location_id.root(),
1133 processes,
1134 clusters,
1135 name.as_deref(),
1136 networking_info,
1137 ),
1138
1139 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1140 };
1141
1142 *instantiate_fn = DebugInstantiateFinalized {
1143 sink: sink_expr,
1144 source: source_expr,
1145 connect_fn: Some(connect_fn),
1146 }
1147 .into();
1148 } else if let HydroNode::ExternalInput {
1149 from_external_key,
1150 from_port_id,
1151 from_many,
1152 codec_type,
1153 port_hint,
1154 instantiate_fn,
1155 metadata,
1156 ..
1157 } = n
1158 {
1159 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1160 DebugInstantiate::Building => {
1161 let from_node = externals
1162 .get(*from_external_key)
1163 .unwrap_or_else(|| {
1164 panic!(
1165 "A external used in the graph was not instantiated: {}",
1166 from_external_key,
1167 )
1168 })
1169 .clone();
1170
1171 match metadata.location_id.root() {
1172 &LocationId::Process(process_key) => {
1173 let to_node = processes
1174 .get(process_key)
1175 .unwrap_or_else(|| {
1176 panic!("A process used in the graph was not instantiated: {}", process_key)
1177 })
1178 .clone();
1179
1180 let sink_port = from_node.next_port();
1181 let source_port = to_node.next_port();
1182
1183 from_node.register(*from_port_id, sink_port.clone());
1184
1185 (
1186 (
1187 parse_quote!(DUMMY),
1188 if *from_many {
1189 D::e2o_many_source(
1190 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1191 &to_node, &source_port,
1192 codec_type.0.as_ref(),
1193 format!("{}_{}", *from_external_key, *from_port_id)
1194 )
1195 } else {
1196 D::e2o_source(
1197 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1198 &from_node, &sink_port,
1199 &to_node, &source_port,
1200 codec_type.0.as_ref(),
1201 format!("{}_{}", *from_external_key, *from_port_id)
1202 )
1203 },
1204 ),
1205 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1206 )
1207 }
1208 LocationId::Cluster(cluster_key) => {
1209 let to_node = clusters
1210 .get(*cluster_key)
1211 .unwrap_or_else(|| {
1212 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1213 })
1214 .clone();
1215
1216 let sink_port = from_node.next_port();
1217 let source_port = to_node.next_port();
1218
1219 from_node.register(*from_port_id, sink_port.clone());
1220
1221 (
1222 (
1223 parse_quote!(DUMMY),
1224 D::e2m_source(
1225 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1226 &from_node, &sink_port,
1227 &to_node, &source_port,
1228 codec_type.0.as_ref(),
1229 format!("{}_{}", *from_external_key, *from_port_id)
1230 ),
1231 ),
1232 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1233 )
1234 }
1235 _ => panic!()
1236 }
1237 },
1238
1239 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1240 };
1241
1242 *instantiate_fn = DebugInstantiateFinalized {
1243 sink: sink_expr,
1244 source: source_expr,
1245 connect_fn: Some(connect_fn),
1246 }
1247 .into();
1248 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1249 let element_type = match &metadata.collection_kind {
1250 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1251 _ => panic!("Embedded source must have Stream collection kind"),
1252 };
1253 let location_key = match metadata.location_id.root() {
1254 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1255 _ => panic!("Embedded source must be on a process or cluster"),
1256 };
1257 D::register_embedded_stream_input(
1258 &mut refcell_env.borrow_mut(),
1259 location_key,
1260 ident,
1261 &element_type,
1262 );
1263 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1264 let element_type = match &metadata.collection_kind {
1265 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1266 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1267 };
1268 let location_key = match metadata.location_id.root() {
1269 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1270 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1271 };
1272 D::register_embedded_singleton_input(
1273 &mut refcell_env.borrow_mut(),
1274 location_key,
1275 ident,
1276 &element_type,
1277 );
1278 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1279 match state {
1280 ClusterMembersState::Uninit => {
1281 let at_location = metadata.location_id.root().clone();
1282 let key = (at_location.clone(), location_id.key());
1283 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1284 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1286 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1287 &(),
1288 );
1289 *state = ClusterMembersState::Stream(expr.into());
1290 } else {
1291 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1293 }
1294 }
1295 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1296 panic!("cluster members already finalized");
1297 }
1298 }
1299 }
1300 },
1301 seen_tees,
1302 false,
1303 );
1304 }
1305
1306 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1307 self.transform_bottom_up(
1308 &mut |l| {
1309 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1310 match instantiate_fn {
1311 DebugInstantiate::Building => panic!("network not built"),
1312
1313 DebugInstantiate::Finalized(finalized) => {
1314 (finalized.connect_fn.take().unwrap())();
1315 }
1316 }
1317 }
1318 },
1319 &mut |n| {
1320 if let HydroNode::Network { instantiate_fn, .. }
1321 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1322 {
1323 match instantiate_fn {
1324 DebugInstantiate::Building => panic!("network not built"),
1325
1326 DebugInstantiate::Finalized(finalized) => {
1327 (finalized.connect_fn.take().unwrap())();
1328 }
1329 }
1330 }
1331 },
1332 seen_tees,
1333 false,
1334 );
1335 }
1336
1337 pub fn transform_bottom_up(
1338 &mut self,
1339 transform_root: &mut impl FnMut(&mut HydroRoot),
1340 transform_node: &mut impl FnMut(&mut HydroNode),
1341 seen_tees: &mut SeenSharedNodes,
1342 check_well_formed: bool,
1343 ) {
1344 self.transform_children(
1345 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1346 seen_tees,
1347 );
1348
1349 transform_root(self);
1350 }
1351
1352 pub fn transform_children(
1353 &mut self,
1354 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1355 seen_tees: &mut SeenSharedNodes,
1356 ) {
1357 match self {
1358 HydroRoot::ForEach { f, input, .. } => {
1359 f.transform_children(&mut transform, seen_tees);
1360 transform(input, seen_tees);
1361 }
1362 HydroRoot::SendExternal { input, .. }
1363 | HydroRoot::DestSink { input, .. }
1364 | HydroRoot::CycleSink { input, .. }
1365 | HydroRoot::EmbeddedOutput { input, .. }
1366 | HydroRoot::Null { input, .. } => {
1367 transform(input, seen_tees);
1368 }
1369 }
1370 }
1371
1372 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1373 match self {
1374 HydroRoot::ForEach {
1375 f,
1376 input,
1377 op_metadata,
1378 } => HydroRoot::ForEach {
1379 f: f.deep_clone(seen_tees),
1380 input: Box::new(input.deep_clone(seen_tees)),
1381 op_metadata: op_metadata.clone(),
1382 },
1383 HydroRoot::SendExternal {
1384 to_external_key,
1385 to_port_id,
1386 to_many,
1387 unpaired,
1388 serialize_fn,
1389 instantiate_fn,
1390 input,
1391 op_metadata,
1392 } => HydroRoot::SendExternal {
1393 to_external_key: *to_external_key,
1394 to_port_id: *to_port_id,
1395 to_many: *to_many,
1396 unpaired: *unpaired,
1397 serialize_fn: serialize_fn.clone(),
1398 instantiate_fn: instantiate_fn.clone(),
1399 input: Box::new(input.deep_clone(seen_tees)),
1400 op_metadata: op_metadata.clone(),
1401 },
1402 HydroRoot::DestSink {
1403 sink,
1404 input,
1405 op_metadata,
1406 } => HydroRoot::DestSink {
1407 sink: sink.clone(),
1408 input: Box::new(input.deep_clone(seen_tees)),
1409 op_metadata: op_metadata.clone(),
1410 },
1411 HydroRoot::CycleSink {
1412 cycle_id,
1413 input,
1414 op_metadata,
1415 } => HydroRoot::CycleSink {
1416 cycle_id: *cycle_id,
1417 input: Box::new(input.deep_clone(seen_tees)),
1418 op_metadata: op_metadata.clone(),
1419 },
1420 HydroRoot::EmbeddedOutput {
1421 ident,
1422 input,
1423 op_metadata,
1424 } => HydroRoot::EmbeddedOutput {
1425 ident: ident.clone(),
1426 input: Box::new(input.deep_clone(seen_tees)),
1427 op_metadata: op_metadata.clone(),
1428 },
1429 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1430 input: Box::new(input.deep_clone(seen_tees)),
1431 op_metadata: op_metadata.clone(),
1432 },
1433 }
1434 }
1435
1436 #[cfg(feature = "build")]
1437 pub fn emit(
1438 &mut self,
1439 graph_builders: &mut dyn DfirBuilder,
1440 seen_tees: &mut SeenSharedNodes,
1441 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1442 next_stmt_id: &mut crate::Counter<StmtId>,
1443 fold_hooked_idents: &mut HashSet<String>,
1444 ) {
1445 self.emit_core(
1446 &mut BuildersOrCallback::<
1447 fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1448 fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1449 >::Builders(graph_builders),
1450 seen_tees,
1451 built_tees,
1452 next_stmt_id,
1453 fold_hooked_idents,
1454 );
1455 }
1456
1457 #[cfg(feature = "build")]
1458 pub fn emit_core(
1459 &mut self,
1460 builders_or_callback: &mut BuildersOrCallback<
1461 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1462 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1463 >,
1464 seen_tees: &mut SeenSharedNodes,
1465 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1466 next_stmt_id: &mut crate::Counter<StmtId>,
1467 fold_hooked_idents: &mut HashSet<String>,
1468 ) {
1469 match self {
1470 HydroRoot::ForEach { f, input, .. } => {
1471 let input_ident = input.emit_core(
1472 builders_or_callback,
1473 seen_tees,
1474 built_tees,
1475 next_stmt_id,
1476 fold_hooked_idents,
1477 );
1478
1479 let stmt_id = next_stmt_id.get_and_increment();
1480
1481 match builders_or_callback {
1482 BuildersOrCallback::Builders(graph_builders) => {
1483 let mut ident_stack: Vec<syn::Ident> = Vec::new();
1484
1485 for (ref_node, _is_mut) in f.singleton_refs.iter() {
1487 let HydroNode::Reference { inner, .. } = ref_node else {
1488 panic!("singleton_refs should only contain HydroNode::Reference");
1489 };
1490 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1491 let idents = built_tees.get(&ptr).expect(
1492 "ForEach singleton ref not found in built_tees — ref node was not emitted",
1493 );
1494 ident_stack.push(idents[0].clone());
1495 }
1496
1497 let f_tokens = f.emit_tokens(&mut ident_stack);
1498
1499 graph_builders
1500 .get_dfir_mut(&input.metadata().location_id)
1501 .add_dfir(
1502 parse_quote! {
1503 #input_ident -> for_each(#f_tokens);
1504 },
1505 None,
1506 Some(&stmt_id.to_string()),
1507 );
1508 }
1509 BuildersOrCallback::Callback(leaf_callback, _) => {
1510 leaf_callback(self, next_stmt_id);
1511 }
1512 }
1513 }
1514
1515 HydroRoot::SendExternal {
1516 serialize_fn,
1517 instantiate_fn,
1518 input,
1519 ..
1520 } => {
1521 let input_ident = input.emit_core(
1522 builders_or_callback,
1523 seen_tees,
1524 built_tees,
1525 next_stmt_id,
1526 fold_hooked_idents,
1527 );
1528
1529 let stmt_id = next_stmt_id.get_and_increment();
1530
1531 match builders_or_callback {
1532 BuildersOrCallback::Builders(graph_builders) => {
1533 let (sink_expr, _) = match instantiate_fn {
1534 DebugInstantiate::Building => (
1535 syn::parse_quote!(DUMMY_SINK),
1536 syn::parse_quote!(DUMMY_SOURCE),
1537 ),
1538
1539 DebugInstantiate::Finalized(finalized) => {
1540 (finalized.sink.clone(), finalized.source.clone())
1541 }
1542 };
1543
1544 graph_builders.create_external_output(
1545 &input.metadata().location_id,
1546 sink_expr,
1547 &input_ident,
1548 serialize_fn.as_ref(),
1549 stmt_id,
1550 );
1551 }
1552 BuildersOrCallback::Callback(leaf_callback, _) => {
1553 leaf_callback(self, next_stmt_id);
1554 }
1555 }
1556 }
1557
1558 HydroRoot::DestSink { sink, input, .. } => {
1559 let input_ident = input.emit_core(
1560 builders_or_callback,
1561 seen_tees,
1562 built_tees,
1563 next_stmt_id,
1564 fold_hooked_idents,
1565 );
1566
1567 let stmt_id = next_stmt_id.get_and_increment();
1568
1569 match builders_or_callback {
1570 BuildersOrCallback::Builders(graph_builders) => {
1571 graph_builders
1572 .get_dfir_mut(&input.metadata().location_id)
1573 .add_dfir(
1574 parse_quote! {
1575 #input_ident -> dest_sink(#sink);
1576 },
1577 None,
1578 Some(&stmt_id.to_string()),
1579 );
1580 }
1581 BuildersOrCallback::Callback(leaf_callback, _) => {
1582 leaf_callback(self, next_stmt_id);
1583 }
1584 }
1585 }
1586
1587 HydroRoot::CycleSink {
1588 cycle_id, input, ..
1589 } => {
1590 let input_ident = input.emit_core(
1591 builders_or_callback,
1592 seen_tees,
1593 built_tees,
1594 next_stmt_id,
1595 fold_hooked_idents,
1596 );
1597
1598 match builders_or_callback {
1599 BuildersOrCallback::Builders(graph_builders) => {
1600 let elem_type: syn::Type = match &input.metadata().collection_kind {
1601 CollectionKind::KeyedSingleton {
1602 key_type,
1603 value_type,
1604 ..
1605 }
1606 | CollectionKind::KeyedStream {
1607 key_type,
1608 value_type,
1609 ..
1610 } => {
1611 parse_quote!((#key_type, #value_type))
1612 }
1613 CollectionKind::Stream { element_type, .. }
1614 | CollectionKind::Singleton { element_type, .. }
1615 | CollectionKind::Optional { element_type, .. } => {
1616 parse_quote!(#element_type)
1617 }
1618 };
1619
1620 let cycle_id_ident = cycle_id.as_ident();
1621 graph_builders
1622 .get_dfir_mut(&input.metadata().location_id)
1623 .add_dfir(
1624 parse_quote! {
1625 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1626 },
1627 None,
1628 None,
1629 );
1630 }
1631 BuildersOrCallback::Callback(_, _) => {}
1633 }
1634 }
1635
1636 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1637 let input_ident = input.emit_core(
1638 builders_or_callback,
1639 seen_tees,
1640 built_tees,
1641 next_stmt_id,
1642 fold_hooked_idents,
1643 );
1644
1645 let stmt_id = next_stmt_id.get_and_increment();
1646
1647 match builders_or_callback {
1648 BuildersOrCallback::Builders(graph_builders) => {
1649 graph_builders
1650 .get_dfir_mut(&input.metadata().location_id)
1651 .add_dfir(
1652 parse_quote! {
1653 #input_ident -> for_each(&mut #ident);
1654 },
1655 None,
1656 Some(&stmt_id.to_string()),
1657 );
1658 }
1659 BuildersOrCallback::Callback(leaf_callback, _) => {
1660 leaf_callback(self, next_stmt_id);
1661 }
1662 }
1663 }
1664
1665 HydroRoot::Null { input, .. } => {
1666 let input_ident = input.emit_core(
1667 builders_or_callback,
1668 seen_tees,
1669 built_tees,
1670 next_stmt_id,
1671 fold_hooked_idents,
1672 );
1673
1674 let stmt_id = next_stmt_id.get_and_increment();
1675
1676 match builders_or_callback {
1677 BuildersOrCallback::Builders(graph_builders) => {
1678 graph_builders
1679 .get_dfir_mut(&input.metadata().location_id)
1680 .add_dfir(
1681 parse_quote! {
1682 #input_ident -> for_each(|_| {});
1683 },
1684 None,
1685 Some(&stmt_id.to_string()),
1686 );
1687 }
1688 BuildersOrCallback::Callback(leaf_callback, _) => {
1689 leaf_callback(self, next_stmt_id);
1690 }
1691 }
1692 }
1693 }
1694 }
1695
1696 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1697 match self {
1698 HydroRoot::ForEach { op_metadata, .. }
1699 | HydroRoot::SendExternal { op_metadata, .. }
1700 | HydroRoot::DestSink { op_metadata, .. }
1701 | HydroRoot::CycleSink { op_metadata, .. }
1702 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1703 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1704 }
1705 }
1706
1707 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1708 match self {
1709 HydroRoot::ForEach { op_metadata, .. }
1710 | HydroRoot::SendExternal { op_metadata, .. }
1711 | HydroRoot::DestSink { op_metadata, .. }
1712 | HydroRoot::CycleSink { op_metadata, .. }
1713 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1714 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1715 }
1716 }
1717
1718 pub fn input(&self) -> &HydroNode {
1719 match self {
1720 HydroRoot::ForEach { input, .. }
1721 | HydroRoot::SendExternal { input, .. }
1722 | HydroRoot::DestSink { input, .. }
1723 | HydroRoot::CycleSink { input, .. }
1724 | HydroRoot::EmbeddedOutput { input, .. }
1725 | HydroRoot::Null { input, .. } => input,
1726 }
1727 }
1728
1729 pub fn input_metadata(&self) -> &HydroIrMetadata {
1730 self.input().metadata()
1731 }
1732
1733 pub fn print_root(&self) -> String {
1734 match self {
1735 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1736 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1737 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1738 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1739 HydroRoot::EmbeddedOutput { ident, .. } => {
1740 format!("EmbeddedOutput({})", ident)
1741 }
1742 HydroRoot::Null { .. } => "Null".to_owned(),
1743 }
1744 }
1745
1746 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1747 match self {
1748 HydroRoot::ForEach { f, .. } => {
1749 transform(&mut f.expr);
1750 }
1751 HydroRoot::DestSink { sink, .. } => {
1752 transform(sink);
1753 }
1754 HydroRoot::SendExternal { .. }
1755 | HydroRoot::CycleSink { .. }
1756 | HydroRoot::EmbeddedOutput { .. }
1757 | HydroRoot::Null { .. } => {}
1758 }
1759 }
1760}
1761
1762#[cfg(feature = "build")]
1763fn tick_of(loc: &LocationId) -> Option<ClockId> {
1764 match loc {
1765 LocationId::Tick(id, _) => Some(*id),
1766 LocationId::Atomic(inner) => tick_of(inner),
1767 _ => None,
1768 }
1769}
1770
1771#[cfg(feature = "build")]
1772fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1773 match loc {
1774 LocationId::Tick(id, inner) => {
1775 *id = uf_find(uf, *id);
1776 remap_location(inner, uf);
1777 }
1778 LocationId::Atomic(inner) => {
1779 remap_location(inner, uf);
1780 }
1781 LocationId::Process(_) | LocationId::Cluster(_) => {}
1782 }
1783}
1784
1785#[cfg(feature = "build")]
1786fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1787 let p = *parent.get(&x).unwrap_or(&x);
1788 if p == x {
1789 return x;
1790 }
1791 let root = uf_find(parent, p);
1792 parent.insert(x, root);
1793 root
1794}
1795
1796#[cfg(feature = "build")]
1797fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1798 let ra = uf_find(parent, a);
1799 let rb = uf_find(parent, b);
1800 if ra != rb {
1801 parent.insert(ra, rb);
1802 }
1803}
1804
1805#[cfg(feature = "build")]
1809pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1810 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1811
1812 transform_bottom_up(
1814 ir,
1815 &mut |_| {},
1816 &mut |node: &mut HydroNode| match node {
1817 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1818 if let (Some(a), Some(b)) = (
1819 tick_of(&inner.metadata().location_id),
1820 tick_of(&metadata.location_id),
1821 ) {
1822 uf_union(&mut uf, a, b);
1823 }
1824 }
1825 HydroNode::Chain {
1826 first,
1827 second,
1828 metadata,
1829 }
1830 | HydroNode::ChainFirst {
1831 first,
1832 second,
1833 metadata,
1834 }
1835 | HydroNode::MergeOrdered {
1836 first,
1837 second,
1838 metadata,
1839 } => {
1840 if let (Some(a), Some(b)) = (
1841 tick_of(&first.metadata().location_id),
1842 tick_of(&metadata.location_id),
1843 ) {
1844 uf_union(&mut uf, a, b);
1845 }
1846 if let (Some(a), Some(b)) = (
1847 tick_of(&second.metadata().location_id),
1848 tick_of(&metadata.location_id),
1849 ) {
1850 uf_union(&mut uf, a, b);
1851 }
1852 }
1853 _ => {}
1854 },
1855 false,
1856 );
1857
1858 transform_bottom_up(
1860 ir,
1861 &mut |_| {},
1862 &mut |node: &mut HydroNode| {
1863 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1864 },
1865 false,
1866 );
1867}
1868
1869#[cfg(feature = "build")]
1870pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1871 let mut builders = SecondaryMap::new();
1872 let mut seen_tees = HashMap::new();
1873 let mut built_tees = HashMap::new();
1874 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1875 let mut fold_hooked_idents = HashSet::new();
1876 for leaf in ir {
1877 leaf.emit(
1878 &mut builders,
1879 &mut seen_tees,
1880 &mut built_tees,
1881 &mut next_stmt_id,
1882 &mut fold_hooked_idents,
1883 );
1884 }
1885 builders
1886}
1887
1888#[cfg(feature = "build")]
1889pub fn traverse_dfir(
1890 ir: &mut [HydroRoot],
1891 transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1892 transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1893) {
1894 let mut seen_tees = HashMap::new();
1895 let mut built_tees = HashMap::new();
1896 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1897 let mut fold_hooked_idents = HashSet::new();
1898 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1899 ir.iter_mut().for_each(|leaf| {
1900 leaf.emit_core(
1901 &mut callback,
1902 &mut seen_tees,
1903 &mut built_tees,
1904 &mut next_stmt_id,
1905 &mut fold_hooked_idents,
1906 );
1907 });
1908}
1909
1910pub fn transform_bottom_up(
1911 ir: &mut [HydroRoot],
1912 transform_root: &mut impl FnMut(&mut HydroRoot),
1913 transform_node: &mut impl FnMut(&mut HydroNode),
1914 check_well_formed: bool,
1915) {
1916 let mut seen_tees = HashMap::new();
1917 ir.iter_mut().for_each(|leaf| {
1918 leaf.transform_bottom_up(
1919 transform_root,
1920 transform_node,
1921 &mut seen_tees,
1922 check_well_formed,
1923 );
1924 });
1925}
1926
1927pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1928 let mut seen_tees = HashMap::new();
1929 ir.iter()
1930 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1931 .collect()
1932}
1933
1934type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1935thread_local! {
1936 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1937 static SERIALIZED_SHARED: PrintedTees
1941 = const { RefCell::new(None) };
1942}
1943
1944pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1945 PRINTED_TEES.with(|printed_tees| {
1946 let mut printed_tees_mut = printed_tees.borrow_mut();
1947 *printed_tees_mut = Some((0, HashMap::new()));
1948 drop(printed_tees_mut);
1949
1950 let ret = f();
1951
1952 let mut printed_tees_mut = printed_tees.borrow_mut();
1953 *printed_tees_mut = None;
1954
1955 ret
1956 })
1957}
1958
1959pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1964 let _guard = SerializedSharedGuard::enter();
1965 f()
1966}
1967
1968struct SerializedSharedGuard {
1971 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1972}
1973
1974impl SerializedSharedGuard {
1975 fn enter() -> Self {
1976 let previous = SERIALIZED_SHARED.with(|cell| {
1977 let mut guard = cell.borrow_mut();
1978 guard.replace((0, HashMap::new()))
1979 });
1980 Self { previous }
1981 }
1982}
1983
1984impl Drop for SerializedSharedGuard {
1985 fn drop(&mut self) {
1986 SERIALIZED_SHARED.with(|cell| {
1987 *cell.borrow_mut() = self.previous.take();
1988 });
1989 }
1990}
1991
1992pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1993
1994impl serde::Serialize for SharedNode {
1995 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2006 SERIALIZED_SHARED.with(|cell| {
2007 let mut guard = cell.borrow_mut();
2008 let state = guard.as_mut().ok_or_else(|| {
2010 serde::ser::Error::custom(
2011 "SharedNode serialization requires an active serialize_dedup_shared scope",
2012 )
2013 })?;
2014 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2015
2016 if let Some(&id) = state.1.get(&ptr) {
2017 drop(guard);
2018 use serde::ser::SerializeMap;
2019 let mut map = serializer.serialize_map(Some(1))?;
2020 map.serialize_entry("$shared_ref", &id)?;
2021 map.end()
2022 } else {
2023 let id = state.0;
2024 state.0 += 1;
2025 state.1.insert(ptr, id);
2026 drop(guard);
2027
2028 use serde::ser::SerializeMap;
2029 let mut map = serializer.serialize_map(Some(2))?;
2030 map.serialize_entry("$shared", &id)?;
2031 map.serialize_entry("node", &*self.0.borrow())?;
2032 map.end()
2033 }
2034 })
2035 }
2036}
2037
2038impl SharedNode {
2039 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2040 Rc::as_ptr(&self.0)
2041 }
2042}
2043
2044impl Debug for SharedNode {
2045 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2046 PRINTED_TEES.with(|printed_tees| {
2047 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2048 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2049
2050 if let Some(printed_tees_mut) = printed_tees_mut {
2051 if let Some(existing) = printed_tees_mut
2052 .1
2053 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2054 {
2055 write!(f, "<shared {}>", existing)
2056 } else {
2057 let next_id = printed_tees_mut.0;
2058 printed_tees_mut.0 += 1;
2059 printed_tees_mut
2060 .1
2061 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2062 drop(printed_tees_mut_borrow);
2063 write!(f, "<shared {}>: ", next_id)?;
2064 Debug::fmt(&self.0.borrow(), f)
2065 }
2066 } else {
2067 drop(printed_tees_mut_borrow);
2068 write!(f, "<shared>: ")?;
2069 Debug::fmt(&self.0.borrow(), f)
2070 }
2071 })
2072 }
2073}
2074
2075impl Hash for SharedNode {
2076 fn hash<H: Hasher>(&self, state: &mut H) {
2077 self.0.borrow_mut().hash(state);
2078 }
2079}
2080
2081#[derive(Debug)]
2086pub enum AccessCounter {
2087 Counting(Cell<u32>),
2088 Frozen(u32),
2089}
2090
2091impl AccessCounter {
2092 pub fn new() -> Self {
2093 Self::Counting(Cell::new(0))
2094 }
2095
2096 pub fn next_group(&self, is_mut: bool) -> Self {
2100 let AccessCounter::Counting(count) = self else {
2101 panic!("Cannot count on `AccessCounter::Frozen`");
2102 };
2103 let c = if is_mut {
2104 let c = count.get() + 1;
2105 count.set(c + 1);
2106 c
2107 } else {
2108 count.get()
2109 };
2110 Self::Frozen(c)
2111 }
2112
2113 pub fn freeze(&self) -> Self {
2115 Self::Frozen(match self {
2116 Self::Counting(count) => count.get(),
2117 Self::Frozen(count) => *count,
2118 })
2119 }
2120
2121 pub fn frozen_group(&self) -> u32 {
2122 let Self::Frozen(count) = self else {
2123 panic!("`AccessCounter` not frozen");
2124 };
2125 *count
2126 }
2127}
2128
2129impl Default for AccessCounter {
2130 fn default() -> Self {
2131 Self::new()
2132 }
2133}
2134
2135impl Hash for AccessCounter {
2136 fn hash<H: Hasher>(&self, _state: &mut H) {
2137 }
2139}
2140
2141impl serde::Serialize for AccessCounter {
2142 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2143 let count = match self {
2144 AccessCounter::Counting(count) => count.get(),
2145 AccessCounter::Frozen(count) => *count,
2146 };
2147 count.serialize(serializer)
2148 }
2149}
2150
2151#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2152pub enum BoundKind {
2153 Unbounded,
2154 Bounded,
2155}
2156
2157#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2158pub enum StreamOrder {
2159 NoOrder,
2160 TotalOrder,
2161}
2162
2163#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2164pub enum StreamRetry {
2165 AtLeastOnce,
2166 ExactlyOnce,
2167}
2168
2169#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2170pub enum KeyedSingletonBoundKind {
2171 Unbounded,
2172 MonotonicKeys,
2173 MonotonicValue,
2174 BoundedValue,
2175 Bounded,
2176}
2177
2178#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2179pub enum SingletonBoundKind {
2180 Unbounded,
2181 Monotonic,
2182 Bounded,
2183}
2184
2185#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2186pub enum CollectionKind {
2187 Stream {
2188 bound: BoundKind,
2189 order: StreamOrder,
2190 retry: StreamRetry,
2191 element_type: DebugType,
2192 },
2193 Singleton {
2194 bound: SingletonBoundKind,
2195 element_type: DebugType,
2196 },
2197 Optional {
2198 bound: BoundKind,
2199 element_type: DebugType,
2200 },
2201 KeyedStream {
2202 bound: BoundKind,
2203 value_order: StreamOrder,
2204 value_retry: StreamRetry,
2205 key_type: DebugType,
2206 value_type: DebugType,
2207 },
2208 KeyedSingleton {
2209 bound: KeyedSingletonBoundKind,
2210 key_type: DebugType,
2211 value_type: DebugType,
2212 },
2213}
2214
2215impl CollectionKind {
2216 pub fn is_bounded(&self) -> bool {
2217 matches!(
2218 self,
2219 CollectionKind::Stream {
2220 bound: BoundKind::Bounded,
2221 ..
2222 } | CollectionKind::Singleton {
2223 bound: SingletonBoundKind::Bounded,
2224 ..
2225 } | CollectionKind::Optional {
2226 bound: BoundKind::Bounded,
2227 ..
2228 } | CollectionKind::KeyedStream {
2229 bound: BoundKind::Bounded,
2230 ..
2231 } | CollectionKind::KeyedSingleton {
2232 bound: KeyedSingletonBoundKind::Bounded,
2233 ..
2234 }
2235 )
2236 }
2237
2238 pub fn is_strict(&self) -> bool {
2241 match self {
2242 CollectionKind::Stream { order, retry, .. } => {
2243 *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2244 }
2245 CollectionKind::KeyedStream {
2246 value_order,
2247 value_retry,
2248 ..
2249 } => {
2250 *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2251 }
2252 CollectionKind::Singleton { .. }
2255 | CollectionKind::Optional { .. }
2256 | CollectionKind::KeyedSingleton { .. } => true,
2257 }
2258 }
2259
2260 pub fn strict_kind(&self) -> CollectionKind {
2262 match self {
2263 CollectionKind::Stream {
2264 bound,
2265 element_type,
2266 ..
2267 } => CollectionKind::Stream {
2268 bound: bound.clone(),
2269 order: StreamOrder::TotalOrder,
2270 retry: StreamRetry::ExactlyOnce,
2271 element_type: element_type.clone(),
2272 },
2273 CollectionKind::KeyedStream {
2274 bound,
2275 key_type,
2276 value_type,
2277 ..
2278 } => CollectionKind::KeyedStream {
2279 bound: bound.clone(),
2280 value_order: StreamOrder::TotalOrder,
2281 value_retry: StreamRetry::ExactlyOnce,
2282 key_type: key_type.clone(),
2283 value_type: value_type.clone(),
2284 },
2285 other => other.clone(),
2286 }
2287 }
2288}
2289
2290#[derive(Clone, serde::Serialize)]
2291pub struct HydroIrMetadata {
2292 pub location_id: LocationId,
2293 pub collection_kind: CollectionKind,
2294 pub consistency: Option<ClusterConsistency>,
2295 pub cardinality: Option<usize>,
2296 pub tag: Option<String>,
2297 pub op: HydroIrOpMetadata,
2298}
2299
2300impl Hash for HydroIrMetadata {
2302 fn hash<H: Hasher>(&self, _: &mut H) {}
2303}
2304
2305impl PartialEq for HydroIrMetadata {
2306 fn eq(&self, _: &Self) -> bool {
2307 true
2308 }
2309}
2310
2311impl Eq for HydroIrMetadata {}
2312
2313impl Debug for HydroIrMetadata {
2314 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2315 f.debug_struct("HydroIrMetadata")
2316 .field("location_id", &self.location_id)
2317 .field("collection_kind", &self.collection_kind)
2318 .finish()
2319 }
2320}
2321
2322#[derive(Clone, serde::Serialize)]
2325pub struct HydroIrOpMetadata {
2326 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2327 pub backtrace: Backtrace,
2328 pub cpu_usage: Option<f64>,
2329 pub network_recv_cpu_usage: Option<f64>,
2330 pub id: Option<usize>,
2331}
2332
2333impl HydroIrOpMetadata {
2334 #[expect(
2335 clippy::new_without_default,
2336 reason = "explicit calls to new ensure correct backtrace bounds"
2337 )]
2338 pub fn new() -> HydroIrOpMetadata {
2339 Self::new_with_skip(1)
2340 }
2341
2342 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2343 HydroIrOpMetadata {
2344 backtrace: Backtrace::get_backtrace(2 + skip_count),
2345 cpu_usage: None,
2346 network_recv_cpu_usage: None,
2347 id: None,
2348 }
2349 }
2350}
2351
2352impl Debug for HydroIrOpMetadata {
2353 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2354 f.debug_struct("HydroIrOpMetadata").finish()
2355 }
2356}
2357
2358impl Hash for HydroIrOpMetadata {
2359 fn hash<H: Hasher>(&self, _: &mut H) {}
2360}
2361
2362#[derive(Debug, Hash, serde::Serialize)]
2365pub enum HydroNode {
2366 Placeholder,
2367
2368 Cast {
2376 inner: Box<HydroNode>,
2377 metadata: HydroIrMetadata,
2378 },
2379
2380 ObserveNonDet {
2386 inner: Box<HydroNode>,
2387 trusted: bool, metadata: HydroIrMetadata,
2389 },
2390
2391 Source {
2392 source: HydroSource,
2393 metadata: HydroIrMetadata,
2394 },
2395
2396 SingletonSource {
2397 value: DebugExpr,
2398 first_tick_only: bool,
2399 metadata: HydroIrMetadata,
2400 },
2401
2402 CycleSource {
2403 cycle_id: CycleId,
2404 metadata: HydroIrMetadata,
2405 },
2406
2407 Tee {
2408 inner: SharedNode,
2409 metadata: HydroIrMetadata,
2410 },
2411
2412 Reference {
2421 inner: SharedNode,
2422 kind: crate::handoff_ref::HandoffRefKind,
2423 access_counter: AccessCounter,
2424 metadata: HydroIrMetadata,
2425 },
2426
2427 Partition {
2428 inner: SharedNode,
2429 f: ClosureExpr,
2430 is_true: bool,
2431 metadata: HydroIrMetadata,
2432 },
2433
2434 BeginAtomic {
2435 inner: Box<HydroNode>,
2436 metadata: HydroIrMetadata,
2437 },
2438
2439 EndAtomic {
2440 inner: Box<HydroNode>,
2441 metadata: HydroIrMetadata,
2442 },
2443
2444 Batch {
2445 inner: Box<HydroNode>,
2446 metadata: HydroIrMetadata,
2447 },
2448
2449 YieldConcat {
2450 inner: Box<HydroNode>,
2451 metadata: HydroIrMetadata,
2452 },
2453
2454 Chain {
2455 first: Box<HydroNode>,
2456 second: Box<HydroNode>,
2457 metadata: HydroIrMetadata,
2458 },
2459
2460 MergeOrdered {
2461 first: Box<HydroNode>,
2462 second: Box<HydroNode>,
2463 metadata: HydroIrMetadata,
2464 },
2465
2466 ChainFirst {
2467 first: Box<HydroNode>,
2468 second: Box<HydroNode>,
2469 metadata: HydroIrMetadata,
2470 },
2471
2472 CrossProduct {
2473 left: Box<HydroNode>,
2474 right: Box<HydroNode>,
2475 metadata: HydroIrMetadata,
2476 },
2477
2478 CrossSingleton {
2479 left: Box<HydroNode>,
2480 right: Box<HydroNode>,
2481 metadata: HydroIrMetadata,
2482 },
2483
2484 Join {
2485 left: Box<HydroNode>,
2486 right: Box<HydroNode>,
2487 metadata: HydroIrMetadata,
2488 },
2489
2490 JoinHalf {
2494 left: Box<HydroNode>,
2495 right: Box<HydroNode>,
2496 metadata: HydroIrMetadata,
2497 },
2498
2499 Difference {
2500 pos: Box<HydroNode>,
2501 neg: Box<HydroNode>,
2502 metadata: HydroIrMetadata,
2503 },
2504
2505 AntiJoin {
2506 pos: Box<HydroNode>,
2507 neg: Box<HydroNode>,
2508 metadata: HydroIrMetadata,
2509 },
2510
2511 ResolveFutures {
2512 input: Box<HydroNode>,
2513 metadata: HydroIrMetadata,
2514 },
2515 ResolveFuturesBlocking {
2516 input: Box<HydroNode>,
2517 metadata: HydroIrMetadata,
2518 },
2519 ResolveFuturesOrdered {
2520 input: Box<HydroNode>,
2521 metadata: HydroIrMetadata,
2522 },
2523
2524 Map {
2525 f: ClosureExpr,
2526 input: Box<HydroNode>,
2527 metadata: HydroIrMetadata,
2528 },
2529 FlatMap {
2530 f: ClosureExpr,
2531 input: Box<HydroNode>,
2532 metadata: HydroIrMetadata,
2533 },
2534 FlatMapStreamBlocking {
2535 f: ClosureExpr,
2536 input: Box<HydroNode>,
2537 metadata: HydroIrMetadata,
2538 },
2539 Filter {
2540 f: ClosureExpr,
2541 input: Box<HydroNode>,
2542 metadata: HydroIrMetadata,
2543 },
2544 FilterMap {
2545 f: ClosureExpr,
2546 input: Box<HydroNode>,
2547 metadata: HydroIrMetadata,
2548 },
2549
2550 DeferTick {
2551 input: Box<HydroNode>,
2552 metadata: HydroIrMetadata,
2553 },
2554 Enumerate {
2555 input: Box<HydroNode>,
2556 metadata: HydroIrMetadata,
2557 },
2558 Inspect {
2559 f: ClosureExpr,
2560 input: Box<HydroNode>,
2561 metadata: HydroIrMetadata,
2562 },
2563
2564 Unique {
2565 input: Box<HydroNode>,
2566 metadata: HydroIrMetadata,
2567 },
2568
2569 Sort {
2570 input: Box<HydroNode>,
2571 metadata: HydroIrMetadata,
2572 },
2573 Fold {
2574 init: ClosureExpr,
2575 acc: ClosureExpr,
2576 input: Box<HydroNode>,
2577 metadata: HydroIrMetadata,
2578 },
2579
2580 Scan {
2581 init: ClosureExpr,
2582 acc: ClosureExpr,
2583 input: Box<HydroNode>,
2584 metadata: HydroIrMetadata,
2585 },
2586 ScanAsyncBlocking {
2587 init: ClosureExpr,
2588 acc: ClosureExpr,
2589 input: Box<HydroNode>,
2590 metadata: HydroIrMetadata,
2591 },
2592 FoldKeyed {
2593 init: ClosureExpr,
2594 acc: ClosureExpr,
2595 input: Box<HydroNode>,
2596 metadata: HydroIrMetadata,
2597 },
2598
2599 Reduce {
2600 f: ClosureExpr,
2601 input: Box<HydroNode>,
2602 metadata: HydroIrMetadata,
2603 },
2604 ReduceKeyed {
2605 f: ClosureExpr,
2606 input: Box<HydroNode>,
2607 metadata: HydroIrMetadata,
2608 },
2609 ReduceKeyedWatermark {
2610 f: ClosureExpr,
2611 input: Box<HydroNode>,
2612 watermark: Box<HydroNode>,
2613 metadata: HydroIrMetadata,
2614 },
2615
2616 Network {
2617 name: Option<String>,
2618 networking_info: crate::networking::NetworkingInfo,
2619 serialize_fn: Option<DebugExpr>,
2620 instantiate_fn: DebugInstantiate,
2621 deserialize_fn: Option<DebugExpr>,
2622 input: Box<HydroNode>,
2623 metadata: HydroIrMetadata,
2624 },
2625
2626 ExternalInput {
2627 from_external_key: LocationKey,
2628 from_port_id: ExternalPortId,
2629 from_many: bool,
2630 codec_type: DebugType,
2631 #[serde(skip)]
2632 port_hint: NetworkHint,
2633 instantiate_fn: DebugInstantiate,
2634 deserialize_fn: Option<DebugExpr>,
2635 metadata: HydroIrMetadata,
2636 },
2637
2638 Counter {
2639 tag: String,
2640 duration: DebugExpr,
2641 prefix: String,
2642 input: Box<HydroNode>,
2643 metadata: HydroIrMetadata,
2644 },
2645
2646 AssertIsConsistent {
2647 inner: Box<HydroNode>,
2648 trusted: bool,
2649 metadata: HydroIrMetadata,
2650 },
2651
2652 UnboundSingleton {
2653 inner: Box<HydroNode>,
2654 metadata: HydroIrMetadata,
2655 },
2656}
2657
2658pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2659pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2660
2661#[cfg(feature = "build")]
2665fn maybe_observe_for_mut(
2666 f: &ClosureExpr,
2667 in_ident: syn::Ident,
2668 in_location: &LocationId,
2669 in_kind: &CollectionKind,
2670 op_meta: &HydroIrOpMetadata,
2671 builders_or_callback: &mut BuildersOrCallback<
2672 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2673 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2674 >,
2675 next_stmt_id: &mut crate::Counter<StmtId>,
2676) -> syn::Ident {
2677 if f.has_mut_ref() && !in_kind.is_strict() {
2678 let observe_stmt_id = next_stmt_id.get_and_increment();
2679 let observe_ident =
2680 syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2681 if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2682 graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2683 }
2684 observe_ident
2685 } else {
2686 in_ident
2687 }
2688}
2689
2690impl HydroNode {
2691 pub fn transform_bottom_up(
2692 &mut self,
2693 transform: &mut impl FnMut(&mut HydroNode),
2694 seen_tees: &mut SeenSharedNodes,
2695 check_well_formed: bool,
2696 ) {
2697 self.transform_children(
2698 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2699 seen_tees,
2700 );
2701
2702 transform(self);
2703
2704 let self_location = self.metadata().location_id.root();
2705
2706 if check_well_formed {
2707 match &*self {
2708 HydroNode::Network { .. } => {}
2709 _ => {
2710 self.input_metadata().iter().for_each(|i| {
2711 if i.location_id.root() != self_location {
2712 panic!(
2713 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2714 i,
2715 i.location_id.root(),
2716 self,
2717 self_location
2718 )
2719 }
2720 });
2721 }
2722 }
2723 }
2724 }
2725
2726 #[inline(always)]
2727 pub fn transform_children(
2728 &mut self,
2729 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2730 seen_tees: &mut SeenSharedNodes,
2731 ) {
2732 match self {
2733 HydroNode::Placeholder => {
2734 panic!();
2735 }
2736
2737 HydroNode::Source { .. }
2738 | HydroNode::SingletonSource { .. }
2739 | HydroNode::CycleSource { .. }
2740 | HydroNode::ExternalInput { .. } => {}
2741
2742 HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2743 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2744 *inner = SharedNode(transformed.clone());
2745 } else {
2746 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2747 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2748 let mut orig = inner.0.replace(HydroNode::Placeholder);
2749 transform(&mut orig, seen_tees);
2750 *transformed_cell.borrow_mut() = orig;
2751 *inner = SharedNode(transformed_cell);
2752 }
2753 }
2754
2755 HydroNode::Partition { inner, f, .. } => {
2756 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2757 *inner = SharedNode(transformed.clone());
2758 } else {
2759 f.transform_children(&mut transform, seen_tees);
2760 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2761 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2762 let mut orig = inner.0.replace(HydroNode::Placeholder);
2763 transform(&mut orig, seen_tees);
2764 *transformed_cell.borrow_mut() = orig;
2765 *inner = SharedNode(transformed_cell);
2766 }
2767 }
2768
2769 HydroNode::Cast { inner, .. }
2770 | HydroNode::ObserveNonDet { inner, .. }
2771 | HydroNode::BeginAtomic { inner, .. }
2772 | HydroNode::EndAtomic { inner, .. }
2773 | HydroNode::Batch { inner, .. }
2774 | HydroNode::YieldConcat { inner, .. }
2775 | HydroNode::UnboundSingleton { inner, .. }
2776 | HydroNode::AssertIsConsistent { inner, .. } => {
2777 transform(inner.as_mut(), seen_tees);
2778 }
2779
2780 HydroNode::Chain { first, second, .. } => {
2781 transform(first.as_mut(), seen_tees);
2782 transform(second.as_mut(), seen_tees);
2783 }
2784
2785 HydroNode::MergeOrdered { first, second, .. } => {
2786 transform(first.as_mut(), seen_tees);
2787 transform(second.as_mut(), seen_tees);
2788 }
2789
2790 HydroNode::ChainFirst { first, second, .. } => {
2791 transform(first.as_mut(), seen_tees);
2792 transform(second.as_mut(), seen_tees);
2793 }
2794
2795 HydroNode::CrossSingleton { left, right, .. }
2796 | HydroNode::CrossProduct { left, right, .. }
2797 | HydroNode::Join { left, right, .. }
2798 | HydroNode::JoinHalf { left, right, .. } => {
2799 transform(left.as_mut(), seen_tees);
2800 transform(right.as_mut(), seen_tees);
2801 }
2802
2803 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2804 transform(pos.as_mut(), seen_tees);
2805 transform(neg.as_mut(), seen_tees);
2806 }
2807
2808 HydroNode::Map { f, input, .. } => {
2809 f.transform_children(&mut transform, seen_tees);
2810 transform(input.as_mut(), seen_tees);
2811 }
2812 HydroNode::FlatMap { f, input, .. }
2813 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2814 | HydroNode::Filter { f, input, .. }
2815 | HydroNode::FilterMap { f, input, .. }
2816 | HydroNode::Inspect { f, input, .. }
2817 | HydroNode::Reduce { f, input, .. }
2818 | HydroNode::ReduceKeyed { f, input, .. } => {
2819 f.transform_children(&mut transform, seen_tees);
2820 transform(input.as_mut(), seen_tees);
2821 }
2822 HydroNode::ReduceKeyedWatermark {
2823 f,
2824 input,
2825 watermark,
2826 ..
2827 } => {
2828 f.transform_children(&mut transform, seen_tees);
2829 transform(input.as_mut(), seen_tees);
2830 transform(watermark.as_mut(), seen_tees);
2831 }
2832 HydroNode::Fold {
2833 init, acc, input, ..
2834 }
2835 | HydroNode::Scan {
2836 init, acc, input, ..
2837 }
2838 | HydroNode::ScanAsyncBlocking {
2839 init, acc, input, ..
2840 }
2841 | HydroNode::FoldKeyed {
2842 init, acc, input, ..
2843 } => {
2844 init.transform_children(&mut transform, seen_tees);
2845 acc.transform_children(&mut transform, seen_tees);
2846 transform(input.as_mut(), seen_tees);
2847 }
2848 HydroNode::ResolveFutures { input, .. }
2849 | HydroNode::ResolveFuturesBlocking { input, .. }
2850 | HydroNode::ResolveFuturesOrdered { input, .. }
2851 | HydroNode::Sort { input, .. }
2852 | HydroNode::DeferTick { input, .. }
2853 | HydroNode::Enumerate { input, .. }
2854 | HydroNode::Unique { input, .. }
2855 | HydroNode::Network { input, .. }
2856 | HydroNode::Counter { input, .. } => {
2857 transform(input.as_mut(), seen_tees);
2858 }
2859 }
2860 }
2861
2862 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2863 match self {
2864 HydroNode::Placeholder => HydroNode::Placeholder,
2865 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2866 inner: Box::new(inner.deep_clone(seen_tees)),
2867 metadata: metadata.clone(),
2868 },
2869 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2870 inner: Box::new(inner.deep_clone(seen_tees)),
2871 metadata: metadata.clone(),
2872 },
2873 HydroNode::ObserveNonDet {
2874 inner,
2875 trusted,
2876 metadata,
2877 } => HydroNode::ObserveNonDet {
2878 inner: Box::new(inner.deep_clone(seen_tees)),
2879 trusted: *trusted,
2880 metadata: metadata.clone(),
2881 },
2882 HydroNode::AssertIsConsistent {
2883 inner,
2884 trusted,
2885 metadata,
2886 } => HydroNode::AssertIsConsistent {
2887 inner: Box::new(inner.deep_clone(seen_tees)),
2888 trusted: *trusted,
2889 metadata: metadata.clone(),
2890 },
2891 HydroNode::Source { source, metadata } => HydroNode::Source {
2892 source: source.clone(),
2893 metadata: metadata.clone(),
2894 },
2895 HydroNode::SingletonSource {
2896 value,
2897 first_tick_only,
2898 metadata,
2899 } => HydroNode::SingletonSource {
2900 value: value.clone(),
2901 first_tick_only: *first_tick_only,
2902 metadata: metadata.clone(),
2903 },
2904 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2905 cycle_id: *cycle_id,
2906 metadata: metadata.clone(),
2907 },
2908 HydroNode::Tee { inner, metadata }
2909 | HydroNode::Reference {
2910 inner, metadata, ..
2911 } => {
2912 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2913 SharedNode(transformed.clone())
2914 } else {
2915 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2916 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2917 let cloned = inner.0.borrow().deep_clone(seen_tees);
2918 *new_rc.borrow_mut() = cloned;
2919 SharedNode(new_rc)
2920 };
2921 if let HydroNode::Reference {
2922 kind,
2923 access_counter,
2924 ..
2925 } = self
2926 {
2927 HydroNode::Reference {
2928 inner: cloned_inner,
2929 kind: *kind,
2930 access_counter: access_counter.freeze(),
2931 metadata: metadata.clone(),
2932 }
2933 } else {
2934 HydroNode::Tee {
2935 inner: cloned_inner,
2936 metadata: metadata.clone(),
2937 }
2938 }
2939 }
2940 HydroNode::Partition {
2941 inner,
2942 f,
2943 is_true,
2944 metadata,
2945 } => {
2946 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2947 HydroNode::Partition {
2948 inner: SharedNode(transformed.clone()),
2949 f: f.deep_clone(seen_tees),
2950 is_true: *is_true,
2951 metadata: metadata.clone(),
2952 }
2953 } else {
2954 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2955 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2956 let cloned = inner.0.borrow().deep_clone(seen_tees);
2957 *new_rc.borrow_mut() = cloned;
2958 HydroNode::Partition {
2959 inner: SharedNode(new_rc),
2960 f: f.deep_clone(seen_tees),
2961 is_true: *is_true,
2962 metadata: metadata.clone(),
2963 }
2964 }
2965 }
2966 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2967 inner: Box::new(inner.deep_clone(seen_tees)),
2968 metadata: metadata.clone(),
2969 },
2970 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2971 inner: Box::new(inner.deep_clone(seen_tees)),
2972 metadata: metadata.clone(),
2973 },
2974 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2975 inner: Box::new(inner.deep_clone(seen_tees)),
2976 metadata: metadata.clone(),
2977 },
2978 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2979 inner: Box::new(inner.deep_clone(seen_tees)),
2980 metadata: metadata.clone(),
2981 },
2982 HydroNode::Chain {
2983 first,
2984 second,
2985 metadata,
2986 } => HydroNode::Chain {
2987 first: Box::new(first.deep_clone(seen_tees)),
2988 second: Box::new(second.deep_clone(seen_tees)),
2989 metadata: metadata.clone(),
2990 },
2991 HydroNode::MergeOrdered {
2992 first,
2993 second,
2994 metadata,
2995 } => HydroNode::MergeOrdered {
2996 first: Box::new(first.deep_clone(seen_tees)),
2997 second: Box::new(second.deep_clone(seen_tees)),
2998 metadata: metadata.clone(),
2999 },
3000 HydroNode::ChainFirst {
3001 first,
3002 second,
3003 metadata,
3004 } => HydroNode::ChainFirst {
3005 first: Box::new(first.deep_clone(seen_tees)),
3006 second: Box::new(second.deep_clone(seen_tees)),
3007 metadata: metadata.clone(),
3008 },
3009 HydroNode::CrossProduct {
3010 left,
3011 right,
3012 metadata,
3013 } => HydroNode::CrossProduct {
3014 left: Box::new(left.deep_clone(seen_tees)),
3015 right: Box::new(right.deep_clone(seen_tees)),
3016 metadata: metadata.clone(),
3017 },
3018 HydroNode::CrossSingleton {
3019 left,
3020 right,
3021 metadata,
3022 } => HydroNode::CrossSingleton {
3023 left: Box::new(left.deep_clone(seen_tees)),
3024 right: Box::new(right.deep_clone(seen_tees)),
3025 metadata: metadata.clone(),
3026 },
3027 HydroNode::Join {
3028 left,
3029 right,
3030 metadata,
3031 } => HydroNode::Join {
3032 left: Box::new(left.deep_clone(seen_tees)),
3033 right: Box::new(right.deep_clone(seen_tees)),
3034 metadata: metadata.clone(),
3035 },
3036 HydroNode::JoinHalf {
3037 left,
3038 right,
3039 metadata,
3040 } => HydroNode::JoinHalf {
3041 left: Box::new(left.deep_clone(seen_tees)),
3042 right: Box::new(right.deep_clone(seen_tees)),
3043 metadata: metadata.clone(),
3044 },
3045 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3046 pos: Box::new(pos.deep_clone(seen_tees)),
3047 neg: Box::new(neg.deep_clone(seen_tees)),
3048 metadata: metadata.clone(),
3049 },
3050 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3051 pos: Box::new(pos.deep_clone(seen_tees)),
3052 neg: Box::new(neg.deep_clone(seen_tees)),
3053 metadata: metadata.clone(),
3054 },
3055 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3056 input: Box::new(input.deep_clone(seen_tees)),
3057 metadata: metadata.clone(),
3058 },
3059 HydroNode::ResolveFuturesBlocking { input, metadata } => {
3060 HydroNode::ResolveFuturesBlocking {
3061 input: Box::new(input.deep_clone(seen_tees)),
3062 metadata: metadata.clone(),
3063 }
3064 }
3065 HydroNode::ResolveFuturesOrdered { input, metadata } => {
3066 HydroNode::ResolveFuturesOrdered {
3067 input: Box::new(input.deep_clone(seen_tees)),
3068 metadata: metadata.clone(),
3069 }
3070 }
3071 HydroNode::Map { f, input, metadata } => HydroNode::Map {
3072 f: f.deep_clone(seen_tees),
3073 input: Box::new(input.deep_clone(seen_tees)),
3074 metadata: metadata.clone(),
3075 },
3076 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3077 f: f.deep_clone(seen_tees),
3078 input: Box::new(input.deep_clone(seen_tees)),
3079 metadata: metadata.clone(),
3080 },
3081 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3082 HydroNode::FlatMapStreamBlocking {
3083 f: f.deep_clone(seen_tees),
3084 input: Box::new(input.deep_clone(seen_tees)),
3085 metadata: metadata.clone(),
3086 }
3087 }
3088 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3089 f: f.deep_clone(seen_tees),
3090 input: Box::new(input.deep_clone(seen_tees)),
3091 metadata: metadata.clone(),
3092 },
3093 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3094 f: f.deep_clone(seen_tees),
3095 input: Box::new(input.deep_clone(seen_tees)),
3096 metadata: metadata.clone(),
3097 },
3098 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3099 input: Box::new(input.deep_clone(seen_tees)),
3100 metadata: metadata.clone(),
3101 },
3102 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3103 input: Box::new(input.deep_clone(seen_tees)),
3104 metadata: metadata.clone(),
3105 },
3106 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3107 f: f.deep_clone(seen_tees),
3108 input: Box::new(input.deep_clone(seen_tees)),
3109 metadata: metadata.clone(),
3110 },
3111 HydroNode::Unique { input, metadata } => HydroNode::Unique {
3112 input: Box::new(input.deep_clone(seen_tees)),
3113 metadata: metadata.clone(),
3114 },
3115 HydroNode::Sort { input, metadata } => HydroNode::Sort {
3116 input: Box::new(input.deep_clone(seen_tees)),
3117 metadata: metadata.clone(),
3118 },
3119 HydroNode::Fold {
3120 init,
3121 acc,
3122 input,
3123 metadata,
3124 } => HydroNode::Fold {
3125 init: init.deep_clone(seen_tees),
3126 acc: acc.deep_clone(seen_tees),
3127 input: Box::new(input.deep_clone(seen_tees)),
3128 metadata: metadata.clone(),
3129 },
3130 HydroNode::Scan {
3131 init,
3132 acc,
3133 input,
3134 metadata,
3135 } => HydroNode::Scan {
3136 init: init.deep_clone(seen_tees),
3137 acc: acc.deep_clone(seen_tees),
3138 input: Box::new(input.deep_clone(seen_tees)),
3139 metadata: metadata.clone(),
3140 },
3141 HydroNode::ScanAsyncBlocking {
3142 init,
3143 acc,
3144 input,
3145 metadata,
3146 } => HydroNode::ScanAsyncBlocking {
3147 init: init.deep_clone(seen_tees),
3148 acc: acc.deep_clone(seen_tees),
3149 input: Box::new(input.deep_clone(seen_tees)),
3150 metadata: metadata.clone(),
3151 },
3152 HydroNode::FoldKeyed {
3153 init,
3154 acc,
3155 input,
3156 metadata,
3157 } => HydroNode::FoldKeyed {
3158 init: init.deep_clone(seen_tees),
3159 acc: acc.deep_clone(seen_tees),
3160 input: Box::new(input.deep_clone(seen_tees)),
3161 metadata: metadata.clone(),
3162 },
3163 HydroNode::ReduceKeyedWatermark {
3164 f,
3165 input,
3166 watermark,
3167 metadata,
3168 } => HydroNode::ReduceKeyedWatermark {
3169 f: f.deep_clone(seen_tees),
3170 input: Box::new(input.deep_clone(seen_tees)),
3171 watermark: Box::new(watermark.deep_clone(seen_tees)),
3172 metadata: metadata.clone(),
3173 },
3174 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3175 f: f.deep_clone(seen_tees),
3176 input: Box::new(input.deep_clone(seen_tees)),
3177 metadata: metadata.clone(),
3178 },
3179 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3180 f: f.deep_clone(seen_tees),
3181 input: Box::new(input.deep_clone(seen_tees)),
3182 metadata: metadata.clone(),
3183 },
3184 HydroNode::Network {
3185 name,
3186 networking_info,
3187 serialize_fn,
3188 instantiate_fn,
3189 deserialize_fn,
3190 input,
3191 metadata,
3192 } => HydroNode::Network {
3193 name: name.clone(),
3194 networking_info: networking_info.clone(),
3195 serialize_fn: serialize_fn.clone(),
3196 instantiate_fn: instantiate_fn.clone(),
3197 deserialize_fn: deserialize_fn.clone(),
3198 input: Box::new(input.deep_clone(seen_tees)),
3199 metadata: metadata.clone(),
3200 },
3201 HydroNode::ExternalInput {
3202 from_external_key,
3203 from_port_id,
3204 from_many,
3205 codec_type,
3206 port_hint,
3207 instantiate_fn,
3208 deserialize_fn,
3209 metadata,
3210 } => HydroNode::ExternalInput {
3211 from_external_key: *from_external_key,
3212 from_port_id: *from_port_id,
3213 from_many: *from_many,
3214 codec_type: codec_type.clone(),
3215 port_hint: *port_hint,
3216 instantiate_fn: instantiate_fn.clone(),
3217 deserialize_fn: deserialize_fn.clone(),
3218 metadata: metadata.clone(),
3219 },
3220 HydroNode::Counter {
3221 tag,
3222 duration,
3223 prefix,
3224 input,
3225 metadata,
3226 } => HydroNode::Counter {
3227 tag: tag.clone(),
3228 duration: duration.clone(),
3229 prefix: prefix.clone(),
3230 input: Box::new(input.deep_clone(seen_tees)),
3231 metadata: metadata.clone(),
3232 },
3233 }
3234 }
3235
3236 #[cfg(feature = "build")]
3237 pub fn emit_core(
3238 &mut self,
3239 builders_or_callback: &mut BuildersOrCallback<
3240 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3241 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3242 >,
3243 seen_tees: &mut SeenSharedNodes,
3244 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3245 next_stmt_id: &mut crate::Counter<StmtId>,
3246 fold_hooked_idents: &mut HashSet<String>,
3247 ) -> syn::Ident {
3248 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3249
3250 self.transform_bottom_up(
3251 &mut |node: &mut HydroNode| {
3252 let out_location = node.metadata().location_id.clone();
3253 match node {
3254 HydroNode::Placeholder => {
3255 panic!()
3256 }
3257
3258 HydroNode::Cast { .. } => {
3259 let _ = next_stmt_id.get_and_increment();
3262 match builders_or_callback {
3263 BuildersOrCallback::Builders(_) => {}
3264 BuildersOrCallback::Callback(_, node_callback) => {
3265 node_callback(node, next_stmt_id);
3266 }
3267 }
3268 }
3270
3271 HydroNode::UnboundSingleton { .. } => {
3272 let inner_ident = ident_stack.pop().unwrap();
3273
3274 let stmt_id = next_stmt_id.get_and_increment();
3275 let out_ident =
3276 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3277
3278 match builders_or_callback {
3279 BuildersOrCallback::Builders(graph_builders) => {
3280 if graph_builders.singleton_intermediates() {
3281 let builder = graph_builders.get_dfir_mut(&out_location);
3282 builder.add_dfir(
3283 parse_quote! {
3284 #out_ident = #inner_ident;
3285 },
3286 None,
3287 None,
3288 );
3289 } else {
3290 let builder = graph_builders.get_dfir_mut(&out_location);
3291 builder.add_dfir(
3292 parse_quote! {
3293 #out_ident = #inner_ident -> persist::<'static>();
3294 },
3295 None,
3296 None,
3297 );
3298 }
3299 }
3300 BuildersOrCallback::Callback(_, node_callback) => {
3301 node_callback(node, next_stmt_id);
3302 }
3303 }
3304
3305 ident_stack.push(out_ident);
3306 }
3307
3308 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3309 let inner_ident = ident_stack.pop().unwrap();
3310
3311 let stmt_id = next_stmt_id.get_and_increment();
3312 let out_ident =
3313 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3314
3315 match builders_or_callback {
3316 BuildersOrCallback::Builders(graph_builders) => {
3317 graph_builders.assert_is_consistent(
3318 *trusted,
3319 &inner.metadata().location_id,
3320 inner_ident,
3321 &out_ident,
3322 );
3323 }
3324 BuildersOrCallback::Callback(_, node_callback) => {
3325 node_callback(node, next_stmt_id);
3326 }
3327 }
3328
3329 ident_stack.push(out_ident);
3330 }
3331
3332 HydroNode::ObserveNonDet {
3333 inner,
3334 trusted,
3335 metadata,
3336 ..
3337 } => {
3338 let inner_ident = ident_stack.pop().unwrap();
3339
3340 let stmt_id = next_stmt_id.get_and_increment();
3341 let observe_ident =
3342 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3343
3344 match builders_or_callback {
3345 BuildersOrCallback::Builders(graph_builders) => {
3346 graph_builders.observe_nondet(
3347 *trusted,
3348 &inner.metadata().location_id,
3349 inner_ident,
3350 &inner.metadata().collection_kind,
3351 &observe_ident,
3352 &metadata.collection_kind,
3353 &metadata.op,
3354 );
3355 }
3356 BuildersOrCallback::Callback(_, node_callback) => {
3357 node_callback(node, next_stmt_id);
3358 }
3359 }
3360
3361 ident_stack.push(observe_ident);
3362 }
3363
3364 HydroNode::Batch {
3365 inner, metadata, ..
3366 } => {
3367 let inner_ident = ident_stack.pop().unwrap();
3368
3369 let stmt_id = next_stmt_id.get_and_increment();
3370 let batch_ident =
3371 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3372
3373 match builders_or_callback {
3374 BuildersOrCallback::Builders(graph_builders) => {
3375 graph_builders.batch(
3376 inner_ident,
3377 &inner.metadata().location_id,
3378 &inner.metadata().collection_kind,
3379 &batch_ident,
3380 &out_location,
3381 &metadata.op,
3382 fold_hooked_idents,
3383 );
3384 }
3385 BuildersOrCallback::Callback(_, node_callback) => {
3386 node_callback(node, next_stmt_id);
3387 }
3388 }
3389
3390 ident_stack.push(batch_ident);
3391 }
3392
3393 HydroNode::YieldConcat { inner, .. } => {
3394 let inner_ident = ident_stack.pop().unwrap();
3395
3396 let stmt_id = next_stmt_id.get_and_increment();
3397 let yield_ident =
3398 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3399
3400 match builders_or_callback {
3401 BuildersOrCallback::Builders(graph_builders) => {
3402 graph_builders.yield_from_tick(
3403 inner_ident,
3404 &inner.metadata().location_id,
3405 &inner.metadata().collection_kind,
3406 &yield_ident,
3407 &out_location,
3408 );
3409 }
3410 BuildersOrCallback::Callback(_, node_callback) => {
3411 node_callback(node, next_stmt_id);
3412 }
3413 }
3414
3415 ident_stack.push(yield_ident);
3416 }
3417
3418 HydroNode::BeginAtomic { inner, metadata } => {
3419 let inner_ident = ident_stack.pop().unwrap();
3420
3421 let stmt_id = next_stmt_id.get_and_increment();
3422 let begin_ident =
3423 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3424
3425 match builders_or_callback {
3426 BuildersOrCallback::Builders(graph_builders) => {
3427 graph_builders.begin_atomic(
3428 inner_ident,
3429 &inner.metadata().location_id,
3430 &inner.metadata().collection_kind,
3431 &begin_ident,
3432 &out_location,
3433 &metadata.op,
3434 );
3435 }
3436 BuildersOrCallback::Callback(_, node_callback) => {
3437 node_callback(node, next_stmt_id);
3438 }
3439 }
3440
3441 ident_stack.push(begin_ident);
3442 }
3443
3444 HydroNode::EndAtomic { inner, .. } => {
3445 let inner_ident = ident_stack.pop().unwrap();
3446
3447 let stmt_id = next_stmt_id.get_and_increment();
3448 let end_ident =
3449 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3450
3451 match builders_or_callback {
3452 BuildersOrCallback::Builders(graph_builders) => {
3453 graph_builders.end_atomic(
3454 inner_ident,
3455 &inner.metadata().location_id,
3456 &inner.metadata().collection_kind,
3457 &end_ident,
3458 );
3459 }
3460 BuildersOrCallback::Callback(_, node_callback) => {
3461 node_callback(node, next_stmt_id);
3462 }
3463 }
3464
3465 ident_stack.push(end_ident);
3466 }
3467
3468 HydroNode::Source {
3469 source, metadata, ..
3470 } => {
3471 if let HydroSource::ExternalNetwork() = source {
3472 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3473 } else {
3474 let stmt_id = next_stmt_id.get_and_increment();
3475 let source_ident =
3476 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3477
3478 let source_stmt = match source {
3479 HydroSource::Stream(expr) => {
3480 debug_assert!(metadata.location_id.is_top_level());
3481 parse_quote! {
3482 #source_ident = source_stream(#expr);
3483 }
3484 }
3485
3486 HydroSource::ExternalNetwork() => {
3487 unreachable!()
3488 }
3489
3490 HydroSource::Iter(expr) => {
3491 if metadata.location_id.is_top_level() {
3492 parse_quote! {
3493 #source_ident = source_iter(#expr);
3494 }
3495 } else {
3496 parse_quote! {
3498 #source_ident = source_iter(#expr) -> persist::<'static>();
3499 }
3500 }
3501 }
3502
3503 HydroSource::Spin() => {
3504 debug_assert!(metadata.location_id.is_top_level());
3505 parse_quote! {
3506 #source_ident = spin();
3507 }
3508 }
3509
3510 HydroSource::ClusterMembers(target_loc, state) => {
3511 debug_assert!(metadata.location_id.is_top_level());
3512
3513 let members_tee_ident = syn::Ident::new(
3514 &format!(
3515 "__cluster_members_tee_{}_{}",
3516 metadata.location_id.root().key(),
3517 target_loc.key(),
3518 ),
3519 Span::call_site(),
3520 );
3521
3522 match state {
3523 ClusterMembersState::Stream(d) => {
3524 parse_quote! {
3525 #members_tee_ident = source_stream(#d) -> tee();
3526 #source_ident = #members_tee_ident;
3527 }
3528 },
3529 ClusterMembersState::Uninit => syn::parse_quote! {
3530 #source_ident = source_stream(DUMMY);
3531 },
3532 ClusterMembersState::Tee(..) => parse_quote! {
3533 #source_ident = #members_tee_ident;
3534 },
3535 }
3536 }
3537
3538 HydroSource::Embedded(ident) => {
3539 parse_quote! {
3540 #source_ident = source_stream(#ident);
3541 }
3542 }
3543
3544 HydroSource::EmbeddedSingleton(ident) => {
3545 parse_quote! {
3546 #source_ident = source_iter([#ident]);
3547 }
3548 }
3549 };
3550
3551 match builders_or_callback {
3552 BuildersOrCallback::Builders(graph_builders) => {
3553 let builder = graph_builders.get_dfir_mut(&out_location);
3554 builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3555 }
3556 BuildersOrCallback::Callback(_, node_callback) => {
3557 node_callback(node, next_stmt_id);
3558 }
3559 }
3560
3561 ident_stack.push(source_ident);
3562 }
3563 }
3564
3565 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3566 let stmt_id = next_stmt_id.get_and_increment();
3567 let source_ident =
3568 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3569
3570 match builders_or_callback {
3571 BuildersOrCallback::Builders(graph_builders) => {
3572 let builder = graph_builders.get_dfir_mut(&out_location);
3573
3574 if *first_tick_only {
3575 assert!(
3576 !metadata.location_id.is_top_level(),
3577 "first_tick_only SingletonSource must be inside a tick"
3578 );
3579 }
3580
3581 if *first_tick_only
3582 || (metadata.location_id.is_top_level()
3583 && metadata.collection_kind.is_bounded())
3584 {
3585 builder.add_dfir(
3586 parse_quote! {
3587 #source_ident = source_iter([#value]);
3588 },
3589 None,
3590 Some(&stmt_id.to_string()),
3591 );
3592 } else {
3593 builder.add_dfir(
3594 parse_quote! {
3595 #source_ident = source_iter([#value]) -> persist::<'static>();
3596 },
3597 None,
3598 Some(&stmt_id.to_string()),
3599 );
3600 }
3601 }
3602 BuildersOrCallback::Callback(_, node_callback) => {
3603 node_callback(node, next_stmt_id);
3604 }
3605 }
3606
3607 ident_stack.push(source_ident);
3608 }
3609
3610 HydroNode::CycleSource { cycle_id, .. } => {
3611 let ident = cycle_id.as_ident();
3612
3613 let _ = next_stmt_id.get_and_increment();
3615
3616 match builders_or_callback {
3617 BuildersOrCallback::Builders(_) => {}
3618 BuildersOrCallback::Callback(_, node_callback) => {
3619 node_callback(node, next_stmt_id);
3620 }
3621 }
3622
3623 ident_stack.push(ident);
3624 }
3625
3626 HydroNode::Tee { inner, .. } => {
3627 let stmt_id = next_stmt_id.get_and_increment();
3630
3631 let ret_ident = if let Some(built_idents) =
3632 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3633 {
3634 match builders_or_callback {
3635 BuildersOrCallback::Builders(_) => {}
3636 BuildersOrCallback::Callback(_, node_callback) => {
3637 node_callback(node, next_stmt_id);
3638 }
3639 }
3640
3641 built_idents[0].clone()
3642 } else {
3643 let inner_ident = ident_stack.pop().unwrap();
3646
3647 let tee_ident =
3648 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3649
3650 built_tees.insert(
3651 inner.0.as_ref() as *const RefCell<HydroNode>,
3652 vec![tee_ident.clone()],
3653 );
3654
3655 match builders_or_callback {
3656 BuildersOrCallback::Builders(graph_builders) => {
3657 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3669 fold_hooked_idents.insert(tee_ident.to_string());
3670 }
3671 let builder = graph_builders.get_dfir_mut(&out_location);
3672 builder.add_dfir(
3673 parse_quote! {
3674 #tee_ident = #inner_ident -> tee();
3675 },
3676 None,
3677 Some(&stmt_id.to_string()),
3678 );
3679 }
3680 BuildersOrCallback::Callback(_, node_callback) => {
3681 node_callback(node, next_stmt_id);
3682 }
3683 }
3684
3685 tee_ident
3686 };
3687
3688 ident_stack.push(ret_ident);
3689 }
3690
3691 HydroNode::Reference { inner, kind, .. } => {
3692 let stmt_id = next_stmt_id.get_and_increment();
3695
3696 let ret_ident = if let Some(built_idents) =
3697 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3698 {
3699 built_idents[0].clone()
3700 } else {
3701 let inner_ident = ident_stack.pop().unwrap();
3702
3703 let ref_ident =
3704 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3705
3706 built_tees.insert(
3707 inner.0.as_ref() as *const RefCell<HydroNode>,
3708 vec![ref_ident.clone()],
3709 );
3710
3711 match builders_or_callback {
3712 BuildersOrCallback::Builders(graph_builders) => {
3713 let builder = graph_builders.get_dfir_mut(&out_location);
3714 let op_ident = syn::Ident::new(
3715 match kind {
3716 crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3717 crate::handoff_ref::HandoffRefKind::Optional => "optional",
3718 crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3719 },
3720 Span::call_site(),
3721 );
3722 builder.add_dfir(
3723 parse_quote! {
3724 #ref_ident = #inner_ident -> #op_ident();
3725 },
3726 None,
3727 Some(&stmt_id.to_string()),
3728 );
3729 }
3730 BuildersOrCallback::Callback(_, node_callback) => {
3731 node_callback(node, next_stmt_id);
3732 }
3733 }
3734
3735 ref_ident
3736 };
3737
3738 ident_stack.push(ret_ident);
3739 }
3740
3741 HydroNode::Partition {
3742 inner, f, is_true, metadata,
3743 } => {
3744 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3746 let stmt_id = next_stmt_id.get_and_increment();
3747
3748 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3749 match builders_or_callback {
3750 BuildersOrCallback::Builders(_) => {}
3751 BuildersOrCallback::Callback(_, node_callback) => {
3752 node_callback(node, next_stmt_id);
3753 }
3754 }
3755
3756 let idx = if is_true { 0 } else { 1 };
3757 built_idents[idx].clone()
3758 } else {
3759 let inner_ident = ident_stack.pop().unwrap();
3762 let f_tokens = f.emit_tokens(&mut ident_stack);
3763
3764 let inner_ident = {
3765 let inner_borrow = inner.0.borrow();
3766 maybe_observe_for_mut(
3767 f, inner_ident,
3768 &inner_borrow.metadata().location_id,
3769 &inner_borrow.metadata().collection_kind,
3770 &metadata.op,
3771 builders_or_callback, next_stmt_id,
3772 )
3773 };
3774
3775 let partition_ident = syn::Ident::new(
3776 &format!("stream_{}_partition", stmt_id),
3777 Span::call_site(),
3778 );
3779 let true_ident = syn::Ident::new(
3780 &format!("stream_{}_true", stmt_id),
3781 Span::call_site(),
3782 );
3783 let false_ident = syn::Ident::new(
3784 &format!("stream_{}_false", stmt_id),
3785 Span::call_site(),
3786 );
3787
3788 built_tees.insert(
3789 ptr,
3790 vec![true_ident.clone(), false_ident.clone()],
3791 );
3792
3793 let stmt_id = next_stmt_id.get_and_increment();
3794 match builders_or_callback {
3795 BuildersOrCallback::Builders(graph_builders) => {
3796 let builder = graph_builders.get_dfir_mut(&out_location);
3797 builder.add_dfir(
3798 parse_quote! {
3799 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3800 #true_ident = #partition_ident[0];
3801 #false_ident = #partition_ident[1];
3802 },
3803 None,
3804 Some(&stmt_id.to_string()),
3805 );
3806 }
3807 BuildersOrCallback::Callback(_, node_callback) => {
3808 node_callback(node, next_stmt_id);
3809 }
3810 }
3811
3812 if is_true { true_ident } else { false_ident }
3813 };
3814
3815 ident_stack.push(ret_ident);
3816 }
3817
3818 HydroNode::Chain { .. } => {
3819 let second_ident = ident_stack.pop().unwrap();
3821 let first_ident = ident_stack.pop().unwrap();
3822
3823 let stmt_id = next_stmt_id.get_and_increment();
3824 let chain_ident =
3825 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3826
3827 match builders_or_callback {
3828 BuildersOrCallback::Builders(graph_builders) => {
3829 let builder = graph_builders.get_dfir_mut(&out_location);
3830 builder.add_dfir(
3831 parse_quote! {
3832 #chain_ident = chain();
3833 #first_ident -> [0]#chain_ident;
3834 #second_ident -> [1]#chain_ident;
3835 },
3836 None,
3837 Some(&stmt_id.to_string()),
3838 );
3839 }
3840 BuildersOrCallback::Callback(_, node_callback) => {
3841 node_callback(node, next_stmt_id);
3842 }
3843 }
3844
3845 ident_stack.push(chain_ident);
3846 }
3847
3848 HydroNode::MergeOrdered { first, metadata, .. } => {
3849 let second_ident = ident_stack.pop().unwrap();
3850 let first_ident = ident_stack.pop().unwrap();
3851
3852 let stmt_id = next_stmt_id.get_and_increment();
3853 let merge_ident =
3854 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3855
3856 match builders_or_callback {
3857 BuildersOrCallback::Builders(graph_builders) => {
3858 graph_builders.merge_ordered(
3859 &first.metadata().location_id,
3860 first_ident,
3861 second_ident,
3862 &merge_ident,
3863 &first.metadata().collection_kind,
3864 &metadata.op,
3865 Some(&stmt_id.to_string()),
3866 );
3867 }
3868 BuildersOrCallback::Callback(_, node_callback) => {
3869 node_callback(node, next_stmt_id);
3870 }
3871 }
3872
3873 ident_stack.push(merge_ident);
3874 }
3875
3876 HydroNode::ChainFirst { .. } => {
3877 let second_ident = ident_stack.pop().unwrap();
3878 let first_ident = ident_stack.pop().unwrap();
3879
3880 let stmt_id = next_stmt_id.get_and_increment();
3881 let chain_ident =
3882 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3883
3884 match builders_or_callback {
3885 BuildersOrCallback::Builders(graph_builders) => {
3886 let builder = graph_builders.get_dfir_mut(&out_location);
3887 builder.add_dfir(
3888 parse_quote! {
3889 #chain_ident = chain_first_n(1);
3890 #first_ident -> [0]#chain_ident;
3891 #second_ident -> [1]#chain_ident;
3892 },
3893 None,
3894 Some(&stmt_id.to_string()),
3895 );
3896 }
3897 BuildersOrCallback::Callback(_, node_callback) => {
3898 node_callback(node, next_stmt_id);
3899 }
3900 }
3901
3902 ident_stack.push(chain_ident);
3903 }
3904
3905 HydroNode::CrossSingleton { right, .. } => {
3906 let right_ident = ident_stack.pop().unwrap();
3907 let left_ident = ident_stack.pop().unwrap();
3908
3909 let stmt_id = next_stmt_id.get_and_increment();
3910 let cross_ident =
3911 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3912
3913 match builders_or_callback {
3914 BuildersOrCallback::Builders(graph_builders) => {
3915 let builder = graph_builders.get_dfir_mut(&out_location);
3916
3917 if right.metadata().location_id.is_top_level()
3918 && right.metadata().collection_kind.is_bounded()
3919 {
3920 builder.add_dfir(
3921 parse_quote! {
3922 #cross_ident = cross_singleton::<'static>();
3923 #left_ident -> [input]#cross_ident;
3924 #right_ident -> [single]#cross_ident;
3925 },
3926 None,
3927 Some(&stmt_id.to_string()),
3928 );
3929 } else {
3930 builder.add_dfir(
3931 parse_quote! {
3932 #cross_ident = cross_singleton();
3933 #left_ident -> [input]#cross_ident;
3934 #right_ident -> [single]#cross_ident;
3935 },
3936 None,
3937 Some(&stmt_id.to_string()),
3938 );
3939 }
3940 }
3941 BuildersOrCallback::Callback(_, node_callback) => {
3942 node_callback(node, next_stmt_id);
3943 }
3944 }
3945
3946 ident_stack.push(cross_ident);
3947 }
3948
3949 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3950 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3951 parse_quote!(cross_join_multiset)
3952 } else {
3953 parse_quote!(join_multiset)
3954 };
3955
3956 let (HydroNode::CrossProduct { left, right, .. }
3957 | HydroNode::Join { left, right, .. }) = node
3958 else {
3959 unreachable!()
3960 };
3961
3962 let is_top_level = left.metadata().location_id.is_top_level()
3963 && right.metadata().location_id.is_top_level();
3964 let left_lifetime = if left.metadata().location_id.is_top_level() {
3965 quote!('static)
3966 } else {
3967 quote!('tick)
3968 };
3969
3970 let right_lifetime = if right.metadata().location_id.is_top_level() {
3971 quote!('static)
3972 } else {
3973 quote!('tick)
3974 };
3975
3976 let right_ident = ident_stack.pop().unwrap();
3977 let left_ident = ident_stack.pop().unwrap();
3978
3979 let stmt_id = next_stmt_id.get_and_increment();
3980 let stream_ident =
3981 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3982
3983 match builders_or_callback {
3984 BuildersOrCallback::Builders(graph_builders) => {
3985 let builder = graph_builders.get_dfir_mut(&out_location);
3986 builder.add_dfir(
3987 if is_top_level {
3988 parse_quote! {
3991 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3992 #left_ident -> [0]#stream_ident;
3993 #right_ident -> [1]#stream_ident;
3994 }
3995 } else {
3996 parse_quote! {
3997 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3998 #left_ident -> [0]#stream_ident;
3999 #right_ident -> [1]#stream_ident;
4000 }
4001 }
4002 ,
4003 None,
4004 Some(&stmt_id.to_string()),
4005 );
4006 }
4007 BuildersOrCallback::Callback(_, node_callback) => {
4008 node_callback(node, next_stmt_id);
4009 }
4010 }
4011
4012 ident_stack.push(stream_ident);
4013 }
4014
4015 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4016 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4017 parse_quote!(difference)
4018 } else {
4019 parse_quote!(anti_join)
4020 };
4021
4022 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4023 node
4024 else {
4025 unreachable!()
4026 };
4027
4028 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4029 quote!('static)
4030 } else {
4031 quote!('tick)
4032 };
4033
4034 let neg_ident = ident_stack.pop().unwrap();
4035 let pos_ident = ident_stack.pop().unwrap();
4036
4037 let stmt_id = next_stmt_id.get_and_increment();
4038 let stream_ident =
4039 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4040
4041 match builders_or_callback {
4042 BuildersOrCallback::Builders(graph_builders) => {
4043 let builder = graph_builders.get_dfir_mut(&out_location);
4044 builder.add_dfir(
4045 parse_quote! {
4046 #stream_ident = #operator::<'tick, #neg_lifetime>();
4047 #pos_ident -> [pos]#stream_ident;
4048 #neg_ident -> [neg]#stream_ident;
4049 },
4050 None,
4051 Some(&stmt_id.to_string()),
4052 );
4053 }
4054 BuildersOrCallback::Callback(_, node_callback) => {
4055 node_callback(node, next_stmt_id);
4056 }
4057 }
4058
4059 ident_stack.push(stream_ident);
4060 }
4061
4062 HydroNode::JoinHalf { .. } => {
4063 let HydroNode::JoinHalf { right, .. } = node else {
4064 unreachable!()
4065 };
4066
4067 assert!(
4068 right.metadata().collection_kind.is_bounded(),
4069 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4070 right.metadata().collection_kind
4071 );
4072
4073 let build_lifetime = if right.metadata().location_id.is_top_level() {
4074 quote!('static)
4075 } else {
4076 quote!('tick)
4077 };
4078
4079 let build_ident = ident_stack.pop().unwrap();
4080 let probe_ident = ident_stack.pop().unwrap();
4081
4082 let stmt_id = next_stmt_id.get_and_increment();
4083 let stream_ident =
4084 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4085
4086 match builders_or_callback {
4087 BuildersOrCallback::Builders(graph_builders) => {
4088 let builder = graph_builders.get_dfir_mut(&out_location);
4089 builder.add_dfir(
4090 parse_quote! {
4091 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4092 #probe_ident -> [probe]#stream_ident;
4093 #build_ident -> [build]#stream_ident;
4094 },
4095 None,
4096 Some(&stmt_id.to_string()),
4097 );
4098 }
4099 BuildersOrCallback::Callback(_, node_callback) => {
4100 node_callback(node, next_stmt_id);
4101 }
4102 }
4103
4104 ident_stack.push(stream_ident);
4105 }
4106
4107 HydroNode::ResolveFutures { .. } => {
4108 let input_ident = ident_stack.pop().unwrap();
4109
4110 let stmt_id = next_stmt_id.get_and_increment();
4111 let futures_ident =
4112 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4113
4114 match builders_or_callback {
4115 BuildersOrCallback::Builders(graph_builders) => {
4116 let builder = graph_builders.get_dfir_mut(&out_location);
4117 builder.add_dfir(
4118 parse_quote! {
4119 #futures_ident = #input_ident -> resolve_futures();
4120 },
4121 None,
4122 Some(&stmt_id.to_string()),
4123 );
4124 }
4125 BuildersOrCallback::Callback(_, node_callback) => {
4126 node_callback(node, next_stmt_id);
4127 }
4128 }
4129
4130 ident_stack.push(futures_ident);
4131 }
4132
4133 HydroNode::ResolveFuturesBlocking { .. } => {
4134 let input_ident = ident_stack.pop().unwrap();
4135
4136 let stmt_id = next_stmt_id.get_and_increment();
4137 let futures_ident =
4138 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4139
4140 match builders_or_callback {
4141 BuildersOrCallback::Builders(graph_builders) => {
4142 let builder = graph_builders.get_dfir_mut(&out_location);
4143 builder.add_dfir(
4144 parse_quote! {
4145 #futures_ident = #input_ident -> resolve_futures_blocking();
4146 },
4147 None,
4148 Some(&stmt_id.to_string()),
4149 );
4150 }
4151 BuildersOrCallback::Callback(_, node_callback) => {
4152 node_callback(node, next_stmt_id);
4153 }
4154 }
4155
4156 ident_stack.push(futures_ident);
4157 }
4158
4159 HydroNode::ResolveFuturesOrdered { .. } => {
4160 let input_ident = ident_stack.pop().unwrap();
4161
4162 let stmt_id = next_stmt_id.get_and_increment();
4163 let futures_ident =
4164 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4165
4166 match builders_or_callback {
4167 BuildersOrCallback::Builders(graph_builders) => {
4168 let builder = graph_builders.get_dfir_mut(&out_location);
4169 builder.add_dfir(
4170 parse_quote! {
4171 #futures_ident = #input_ident -> resolve_futures_ordered();
4172 },
4173 None,
4174 Some(&stmt_id.to_string()),
4175 );
4176 }
4177 BuildersOrCallback::Callback(_, node_callback) => {
4178 node_callback(node, next_stmt_id);
4179 }
4180 }
4181
4182 ident_stack.push(futures_ident);
4183 }
4184
4185 HydroNode::Map {
4186 f,
4187 input,
4188 metadata,
4189 } => {
4190 let input_ident = ident_stack.pop().unwrap();
4192 let f_tokens = f.emit_tokens(&mut ident_stack);
4193
4194 let input_ident = maybe_observe_for_mut(
4195 f,
4196 input_ident,
4197 &input.metadata().location_id,
4198 &input.metadata().collection_kind,
4199 &metadata.op,
4200 builders_or_callback,
4201 next_stmt_id,
4202 );
4203
4204 let stmt_id = next_stmt_id.get_and_increment();
4205 let map_ident =
4206 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4207
4208 match builders_or_callback {
4209 BuildersOrCallback::Builders(graph_builders) => {
4210 let builder = graph_builders.get_dfir_mut(&out_location);
4211 builder.add_dfir(
4212 parse_quote! {
4213 #map_ident = #input_ident -> map(#f_tokens);
4214 },
4215 None,
4216 Some(&stmt_id.to_string()),
4217 );
4218 }
4219 BuildersOrCallback::Callback(_, node_callback) => {
4220 node_callback(node, next_stmt_id);
4221 }
4222 }
4223
4224 ident_stack.push(map_ident);
4225 }
4226
4227 HydroNode::FlatMap { f, input, metadata } => {
4228 let input_ident = ident_stack.pop().unwrap();
4229 let f_tokens = f.emit_tokens(&mut ident_stack);
4230
4231 let input_ident = maybe_observe_for_mut(
4232 f, input_ident,
4233 &input.metadata().location_id,
4234 &input.metadata().collection_kind,
4235 &metadata.op,
4236 builders_or_callback, next_stmt_id,
4237 );
4238
4239 let stmt_id = next_stmt_id.get_and_increment();
4240 let flat_map_ident =
4241 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4242
4243 match builders_or_callback {
4244 BuildersOrCallback::Builders(graph_builders) => {
4245 let builder = graph_builders.get_dfir_mut(&out_location);
4246 builder.add_dfir(
4247 parse_quote! {
4248 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4249 },
4250 None,
4251 Some(&stmt_id.to_string()),
4252 );
4253 }
4254 BuildersOrCallback::Callback(_, node_callback) => {
4255 node_callback(node, next_stmt_id);
4256 }
4257 }
4258
4259 ident_stack.push(flat_map_ident);
4260 }
4261
4262 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4263 let input_ident = ident_stack.pop().unwrap();
4264 let f_tokens = f.emit_tokens(&mut ident_stack);
4265
4266 let input_ident = maybe_observe_for_mut(
4267 f, input_ident,
4268 &input.metadata().location_id,
4269 &input.metadata().collection_kind,
4270 &metadata.op,
4271 builders_or_callback, next_stmt_id,
4272 );
4273
4274 let stmt_id = next_stmt_id.get_and_increment();
4275 let flat_map_stream_blocking_ident =
4276 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4277
4278 match builders_or_callback {
4279 BuildersOrCallback::Builders(graph_builders) => {
4280 let builder = graph_builders.get_dfir_mut(&out_location);
4281 builder.add_dfir(
4282 parse_quote! {
4283 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4284 },
4285 None,
4286 Some(&stmt_id.to_string()),
4287 );
4288 }
4289 BuildersOrCallback::Callback(_, node_callback) => {
4290 node_callback(node, next_stmt_id);
4291 }
4292 }
4293
4294 ident_stack.push(flat_map_stream_blocking_ident);
4295 }
4296
4297 HydroNode::Filter { f, input, metadata } => {
4298 let input_ident = ident_stack.pop().unwrap();
4299 let f_tokens = f.emit_tokens(&mut ident_stack);
4300
4301 let input_ident = maybe_observe_for_mut(
4302 f, input_ident,
4303 &input.metadata().location_id,
4304 &input.metadata().collection_kind,
4305 &metadata.op,
4306 builders_or_callback, next_stmt_id,
4307 );
4308
4309 let stmt_id = next_stmt_id.get_and_increment();
4310 let filter_ident =
4311 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4312
4313 match builders_or_callback {
4314 BuildersOrCallback::Builders(graph_builders) => {
4315 let builder = graph_builders.get_dfir_mut(&out_location);
4316 builder.add_dfir(
4317 parse_quote! {
4318 #filter_ident = #input_ident -> filter(#f_tokens);
4319 },
4320 None,
4321 Some(&stmt_id.to_string()),
4322 );
4323 }
4324 BuildersOrCallback::Callback(_, node_callback) => {
4325 node_callback(node, next_stmt_id);
4326 }
4327 }
4328
4329 ident_stack.push(filter_ident);
4330 }
4331
4332 HydroNode::FilterMap { f, input, metadata } => {
4333 let input_ident = ident_stack.pop().unwrap();
4334 let f_tokens = f.emit_tokens(&mut ident_stack);
4335
4336 let input_ident = maybe_observe_for_mut(
4337 f, input_ident,
4338 &input.metadata().location_id,
4339 &input.metadata().collection_kind,
4340 &metadata.op,
4341 builders_or_callback, next_stmt_id,
4342 );
4343
4344 let stmt_id = next_stmt_id.get_and_increment();
4345 let filter_map_ident =
4346 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4347
4348 match builders_or_callback {
4349 BuildersOrCallback::Builders(graph_builders) => {
4350 let builder = graph_builders.get_dfir_mut(&out_location);
4351 builder.add_dfir(
4352 parse_quote! {
4353 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4354 },
4355 None,
4356 Some(&stmt_id.to_string()),
4357 );
4358 }
4359 BuildersOrCallback::Callback(_, node_callback) => {
4360 node_callback(node, next_stmt_id);
4361 }
4362 }
4363
4364 ident_stack.push(filter_map_ident);
4365 }
4366
4367 HydroNode::Sort { .. } => {
4368 let input_ident = ident_stack.pop().unwrap();
4369
4370 let stmt_id = next_stmt_id.get_and_increment();
4371 let sort_ident =
4372 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4373
4374 match builders_or_callback {
4375 BuildersOrCallback::Builders(graph_builders) => {
4376 let builder = graph_builders.get_dfir_mut(&out_location);
4377 builder.add_dfir(
4378 parse_quote! {
4379 #sort_ident = #input_ident -> sort();
4380 },
4381 None,
4382 Some(&stmt_id.to_string()),
4383 );
4384 }
4385 BuildersOrCallback::Callback(_, node_callback) => {
4386 node_callback(node, next_stmt_id);
4387 }
4388 }
4389
4390 ident_stack.push(sort_ident);
4391 }
4392
4393 HydroNode::DeferTick { .. } => {
4394 let input_ident = ident_stack.pop().unwrap();
4395
4396 let stmt_id = next_stmt_id.get_and_increment();
4397 let defer_tick_ident =
4398 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4399
4400 match builders_or_callback {
4401 BuildersOrCallback::Builders(graph_builders) => {
4402 let builder = graph_builders.get_dfir_mut(&out_location);
4403 builder.add_dfir(
4404 parse_quote! {
4405 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4406 },
4407 None,
4408 Some(&stmt_id.to_string()),
4409 );
4410 }
4411 BuildersOrCallback::Callback(_, node_callback) => {
4412 node_callback(node, next_stmt_id);
4413 }
4414 }
4415
4416 ident_stack.push(defer_tick_ident);
4417 }
4418
4419 HydroNode::Enumerate { input, .. } => {
4420 let input_ident = ident_stack.pop().unwrap();
4421
4422 let stmt_id = next_stmt_id.get_and_increment();
4423 let enumerate_ident =
4424 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4425
4426 match builders_or_callback {
4427 BuildersOrCallback::Builders(graph_builders) => {
4428 let builder = graph_builders.get_dfir_mut(&out_location);
4429 let lifetime = if input.metadata().location_id.is_top_level() {
4430 quote!('static)
4431 } else {
4432 quote!('tick)
4433 };
4434 builder.add_dfir(
4435 parse_quote! {
4436 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4437 },
4438 None,
4439 Some(&stmt_id.to_string()),
4440 );
4441 }
4442 BuildersOrCallback::Callback(_, node_callback) => {
4443 node_callback(node, next_stmt_id);
4444 }
4445 }
4446
4447 ident_stack.push(enumerate_ident);
4448 }
4449
4450 HydroNode::Inspect { f, input, metadata } => {
4451 let input_ident = ident_stack.pop().unwrap();
4452 let f_tokens = f.emit_tokens(&mut ident_stack);
4453
4454 let input_ident = maybe_observe_for_mut(
4455 f, input_ident,
4456 &input.metadata().location_id,
4457 &input.metadata().collection_kind,
4458 &metadata.op,
4459 builders_or_callback, next_stmt_id,
4460 );
4461
4462 let stmt_id = next_stmt_id.get_and_increment();
4463 let inspect_ident =
4464 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4465
4466 match builders_or_callback {
4467 BuildersOrCallback::Builders(graph_builders) => {
4468 let builder = graph_builders.get_dfir_mut(&out_location);
4469 builder.add_dfir(
4470 parse_quote! {
4471 #inspect_ident = #input_ident -> inspect(#f_tokens);
4472 },
4473 None,
4474 Some(&stmt_id.to_string()),
4475 );
4476 }
4477 BuildersOrCallback::Callback(_, node_callback) => {
4478 node_callback(node, next_stmt_id);
4479 }
4480 }
4481
4482 ident_stack.push(inspect_ident);
4483 }
4484
4485 HydroNode::Unique { input, .. } => {
4486 let input_ident = ident_stack.pop().unwrap();
4487
4488 let stmt_id = next_stmt_id.get_and_increment();
4489 let unique_ident =
4490 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4491
4492 match builders_or_callback {
4493 BuildersOrCallback::Builders(graph_builders) => {
4494 let builder = graph_builders.get_dfir_mut(&out_location);
4495 let lifetime = if input.metadata().location_id.is_top_level() {
4496 quote!('static)
4497 } else {
4498 quote!('tick)
4499 };
4500
4501 builder.add_dfir(
4502 parse_quote! {
4503 #unique_ident = #input_ident -> unique::<#lifetime>();
4504 },
4505 None,
4506 Some(&stmt_id.to_string()),
4507 );
4508 }
4509 BuildersOrCallback::Callback(_, node_callback) => {
4510 node_callback(node, next_stmt_id);
4511 }
4512 }
4513
4514 ident_stack.push(unique_ident);
4515 }
4516
4517 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4518 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4519 if input.metadata().location_id.is_top_level()
4520 && input.metadata().collection_kind.is_bounded()
4521 {
4522 parse_quote!(fold_no_replay)
4523 } else {
4524 parse_quote!(fold)
4525 }
4526 } else if matches!(node, HydroNode::Scan { .. }) {
4527 parse_quote!(scan)
4528 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4529 parse_quote!(scan_async_blocking)
4530 } else if let HydroNode::FoldKeyed { input, .. } = node {
4531 if input.metadata().location_id.is_top_level()
4532 && input.metadata().collection_kind.is_bounded()
4533 {
4534 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4535 } else {
4536 parse_quote!(fold_keyed)
4537 }
4538 } else {
4539 unreachable!()
4540 };
4541
4542 let (HydroNode::Fold { input, .. }
4543 | HydroNode::FoldKeyed { input, .. }
4544 | HydroNode::Scan { input, .. }
4545 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4546 else {
4547 unreachable!()
4548 };
4549
4550 let lifetime = if input.metadata().location_id.is_top_level() {
4551 quote!('static)
4552 } else {
4553 quote!('tick)
4554 };
4555
4556 let input_ident = ident_stack.pop().unwrap();
4557
4558 let (HydroNode::Fold { init, acc, .. }
4559 | HydroNode::FoldKeyed { init, acc, .. }
4560 | HydroNode::Scan { init, acc, .. }
4561 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4562 else {
4563 unreachable!()
4564 };
4565
4566 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4567 let init_tokens = init.emit_tokens(&mut ident_stack);
4568
4569 let stmt_id = next_stmt_id.get_and_increment();
4570 let fold_ident =
4571 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4572
4573 match builders_or_callback {
4574 BuildersOrCallback::Builders(graph_builders) => {
4575 if matches!(node, HydroNode::Fold { .. })
4576 && node.metadata().location_id.is_top_level()
4577 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4578 && graph_builders.singleton_intermediates()
4579 && !node.metadata().collection_kind.is_bounded()
4580 {
4581 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4582 let hooked_input_ident = graph_builders.emit_fold_hook(
4583 &input.metadata().location_id,
4584 &input_ident,
4585 &input.metadata().collection_kind,
4586 &node.metadata().op,
4587 );
4588
4589 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4590 let acc: syn::Expr = parse_quote!({
4591 let mut __inner = #acc_tokens;
4592 move |__state, __batch: Vec<_>| {
4593 if __batch.is_empty() {
4594 return None;
4595 }
4596 for __value in __batch {
4597 __inner(__state, __value);
4598 }
4599 Some(__state.clone())
4600 }
4601 });
4602 (hooked, acc)
4603 } else {
4604 let acc: syn::Expr = parse_quote!({
4605 let mut __inner = #acc_tokens;
4606 move |__state, __value| {
4607 __inner(__state, __value);
4608 Some(__state.clone())
4609 }
4610 });
4611 (&input_ident, acc)
4612 };
4613
4614 let builder = graph_builders.get_dfir_mut(&out_location);
4615 builder.add_dfir(
4616 parse_quote! {
4617 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4618 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4619 #fold_ident = chain();
4620 },
4621 None,
4622 Some(&stmt_id.to_string()),
4623 );
4624
4625 if hooked_input_ident.is_some() {
4626 fold_hooked_idents.insert(fold_ident.to_string());
4627 }
4628 } else if matches!(node, HydroNode::FoldKeyed { .. })
4629 && node.metadata().location_id.is_top_level()
4630 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4631 && graph_builders.singleton_intermediates()
4632 && !node.metadata().collection_kind.is_bounded()
4633 {
4634 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4635 let hooked_input_ident = graph_builders.emit_fold_hook(
4636 &input.metadata().location_id,
4637 &input_ident,
4638 &input.metadata().collection_kind,
4639 &node.metadata().op,
4640 );
4641 let builder = graph_builders.get_dfir_mut(&out_location);
4642
4643 let wrapped_acc: syn::Expr = parse_quote!({
4644 let mut __init = #init_tokens;
4645 let mut __inner = #acc_tokens;
4646 move |__state, __kv: (_, _)| {
4647 let __state = __state
4649 .entry(::std::clone::Clone::clone(&__kv.0))
4650 .or_insert_with(|| (__init)());
4651 __inner(__state, __kv.1);
4652 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4653 }
4654 });
4655
4656 if let Some(hooked_input_ident) = hooked_input_ident {
4657 builder.add_dfir(
4658 parse_quote! {
4659 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4660 },
4661 None,
4662 Some(&stmt_id.to_string()),
4663 );
4664
4665 fold_hooked_idents.insert(fold_ident.to_string());
4666 } else {
4667 builder.add_dfir(
4668 parse_quote! {
4669 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4670 },
4671 None,
4672 Some(&stmt_id.to_string()),
4673 );
4674 }
4675 } else if (matches!(node, HydroNode::Fold { .. })
4676 || matches!(node, HydroNode::FoldKeyed { .. }))
4677 && !node.metadata().location_id.is_top_level()
4678 && graph_builders.singleton_intermediates()
4679 {
4680 let input_ref = match &*node {
4681 HydroNode::Fold { input, .. } => input,
4682 HydroNode::FoldKeyed { input, .. } => input,
4683 _ => unreachable!(),
4684 };
4685 let hooked_input_ident = graph_builders.emit_fold_hook(
4686 &input_ref.metadata().location_id,
4687 &input_ident,
4688 &input_ref.metadata().collection_kind,
4689 &node.metadata().op,
4690 );
4691
4692 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4693 let builder = graph_builders.get_dfir_mut(&out_location);
4694 builder.add_dfir(
4695 parse_quote! {
4696 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4697 },
4698 None,
4699 Some(&stmt_id.to_string()),
4700 );
4701 } else {
4702 let builder = graph_builders.get_dfir_mut(&out_location);
4703 builder.add_dfir(
4704 parse_quote! {
4705 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4706 },
4707 None,
4708 Some(&stmt_id.to_string()),
4709 );
4710 }
4711 }
4712 BuildersOrCallback::Callback(_, node_callback) => {
4713 node_callback(node, next_stmt_id);
4714 }
4715 }
4716
4717 ident_stack.push(fold_ident);
4718 }
4719
4720 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4721 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4722 if input.metadata().location_id.is_top_level()
4723 && input.metadata().collection_kind.is_bounded()
4724 {
4725 parse_quote!(reduce_no_replay)
4726 } else {
4727 parse_quote!(reduce)
4728 }
4729 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4730 if input.metadata().location_id.is_top_level()
4731 && input.metadata().collection_kind.is_bounded()
4732 {
4733 todo!(
4734 "Calling keyed reduce on a top-level bounded collection is not supported"
4735 )
4736 } else {
4737 parse_quote!(reduce_keyed)
4738 }
4739 } else {
4740 unreachable!()
4741 };
4742
4743 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4744 else {
4745 unreachable!()
4746 };
4747
4748 let lifetime = if input.metadata().location_id.is_top_level() {
4749 quote!('static)
4750 } else {
4751 quote!('tick)
4752 };
4753
4754 let input_ident = ident_stack.pop().unwrap();
4755
4756 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4757 else {
4758 unreachable!()
4759 };
4760
4761 let f_tokens = f.emit_tokens(&mut ident_stack);
4762
4763 let stmt_id = next_stmt_id.get_and_increment();
4764 let reduce_ident =
4765 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4766
4767 match builders_or_callback {
4768 BuildersOrCallback::Builders(graph_builders) => {
4769 if matches!(node, HydroNode::Reduce { .. })
4770 && node.metadata().location_id.is_top_level()
4771 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4772 && graph_builders.singleton_intermediates()
4773 && !node.metadata().collection_kind.is_bounded()
4774 {
4775 todo!(
4776 "Reduce with optional intermediates is not yet supported in simulator"
4777 );
4778 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4779 && node.metadata().location_id.is_top_level()
4780 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4781 && graph_builders.singleton_intermediates()
4782 && !node.metadata().collection_kind.is_bounded()
4783 {
4784 todo!(
4785 "Reduce keyed with optional intermediates is not yet supported in simulator"
4786 );
4787 } else {
4788 let builder = graph_builders.get_dfir_mut(&out_location);
4789 builder.add_dfir(
4790 parse_quote! {
4791 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4792 },
4793 None,
4794 Some(&stmt_id.to_string()),
4795 );
4796 }
4797 }
4798 BuildersOrCallback::Callback(_, node_callback) => {
4799 node_callback(node, next_stmt_id);
4800 }
4801 }
4802
4803 ident_stack.push(reduce_ident);
4804 }
4805
4806 HydroNode::ReduceKeyedWatermark {
4807 f,
4808 input,
4809 metadata,
4810 ..
4811 } => {
4812 let lifetime = if input.metadata().location_id.is_top_level() {
4813 quote!('static)
4814 } else {
4815 quote!('tick)
4816 };
4817
4818 let watermark_ident = ident_stack.pop().unwrap();
4820 let input_ident = ident_stack.pop().unwrap();
4821 let f_tokens = f.emit_tokens(&mut ident_stack);
4822
4823 let stmt_id = next_stmt_id.get_and_increment();
4824 let chain_ident = syn::Ident::new(
4825 &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4826 Span::call_site(),
4827 );
4828
4829 let fold_ident =
4830 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4831
4832 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4833 && input.metadata().collection_kind.is_bounded()
4834 {
4835 parse_quote!(fold_no_replay)
4836 } else {
4837 parse_quote!(fold)
4838 };
4839
4840 match builders_or_callback {
4841 BuildersOrCallback::Builders(graph_builders) => {
4842 if metadata.location_id.is_top_level()
4843 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4844 && graph_builders.singleton_intermediates()
4845 && !metadata.collection_kind.is_bounded()
4846 {
4847 todo!(
4848 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4849 )
4850 } else {
4851 let builder = graph_builders.get_dfir_mut(&out_location);
4852 builder.add_dfir(
4853 parse_quote! {
4854 #chain_ident = chain();
4855 #input_ident
4856 -> map(|x| (Some(x), None))
4857 -> [0]#chain_ident;
4858 #watermark_ident
4859 -> map(|watermark| (None, Some(watermark)))
4860 -> [1]#chain_ident;
4861
4862 #fold_ident = #chain_ident
4863 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4864 let __reduce_keyed_fn = #f_tokens;
4865 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4866 if let Some((k, v)) = opt_payload {
4867 if let Some(curr_watermark) = *opt_curr_watermark {
4868 if k < curr_watermark {
4869 return;
4870 }
4871 }
4872 match map.entry(k) {
4873 ::std::collections::hash_map::Entry::Vacant(e) => {
4874 e.insert(v);
4875 }
4876 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4877 __reduce_keyed_fn(e.get_mut(), v);
4878 }
4879 }
4880 } else {
4881 let watermark = opt_watermark.unwrap();
4882 if let Some(curr_watermark) = *opt_curr_watermark {
4883 if watermark <= curr_watermark {
4884 return;
4885 }
4886 }
4887 map.retain(|k, _| *k >= watermark);
4888 *opt_curr_watermark = Some(watermark);
4889 }
4890 }
4891 })
4892 -> flat_map(|(map, _curr_watermark)| map);
4893 },
4894 None,
4895 Some(&stmt_id.to_string()),
4896 );
4897 }
4898 }
4899 BuildersOrCallback::Callback(_, node_callback) => {
4900 node_callback(node, next_stmt_id);
4901 }
4902 }
4903
4904 ident_stack.push(fold_ident);
4905 }
4906
4907 HydroNode::Network {
4908 networking_info,
4909 serialize_fn: serialize_pipeline,
4910 instantiate_fn,
4911 deserialize_fn: deserialize_pipeline,
4912 input,
4913 ..
4914 } => {
4915 let input_ident = ident_stack.pop().unwrap();
4916
4917 let stmt_id = next_stmt_id.get_and_increment();
4918 let receiver_stream_ident =
4919 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4920
4921 match builders_or_callback {
4922 BuildersOrCallback::Builders(graph_builders) => {
4923 let (sink_expr, source_expr) = match instantiate_fn {
4924 DebugInstantiate::Building => (
4925 syn::parse_quote!(DUMMY_SINK),
4926 syn::parse_quote!(DUMMY_SOURCE),
4927 ),
4928
4929 DebugInstantiate::Finalized(finalized) => {
4930 (finalized.sink.clone(), finalized.source.clone())
4931 }
4932 };
4933
4934 graph_builders.create_network(
4935 &input.metadata().location_id,
4936 &out_location,
4937 input_ident,
4938 &receiver_stream_ident,
4939 serialize_pipeline.as_ref(),
4940 sink_expr,
4941 source_expr,
4942 deserialize_pipeline.as_ref(),
4943 stmt_id,
4944 networking_info,
4945 );
4946 }
4947 BuildersOrCallback::Callback(_, node_callback) => {
4948 node_callback(node, next_stmt_id);
4949 }
4950 }
4951
4952 ident_stack.push(receiver_stream_ident);
4953 }
4954
4955 HydroNode::ExternalInput {
4956 instantiate_fn,
4957 deserialize_fn: deserialize_pipeline,
4958 ..
4959 } => {
4960 let stmt_id = next_stmt_id.get_and_increment();
4961 let receiver_stream_ident =
4962 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4963
4964 match builders_or_callback {
4965 BuildersOrCallback::Builders(graph_builders) => {
4966 let (_, source_expr) = match instantiate_fn {
4967 DebugInstantiate::Building => (
4968 syn::parse_quote!(DUMMY_SINK),
4969 syn::parse_quote!(DUMMY_SOURCE),
4970 ),
4971
4972 DebugInstantiate::Finalized(finalized) => {
4973 (finalized.sink.clone(), finalized.source.clone())
4974 }
4975 };
4976
4977 graph_builders.create_external_source(
4978 &out_location,
4979 source_expr,
4980 &receiver_stream_ident,
4981 deserialize_pipeline.as_ref(),
4982 stmt_id,
4983 );
4984 }
4985 BuildersOrCallback::Callback(_, node_callback) => {
4986 node_callback(node, next_stmt_id);
4987 }
4988 }
4989
4990 ident_stack.push(receiver_stream_ident);
4991 }
4992
4993 HydroNode::Counter {
4994 tag,
4995 duration,
4996 prefix,
4997 ..
4998 } => {
4999 let input_ident = ident_stack.pop().unwrap();
5000
5001 let stmt_id = next_stmt_id.get_and_increment();
5002 let counter_ident =
5003 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5004
5005 match builders_or_callback {
5006 BuildersOrCallback::Builders(graph_builders) => {
5007 let arg = format!("{}({})", prefix, tag);
5008 let builder = graph_builders.get_dfir_mut(&out_location);
5009 builder.add_dfir(
5010 parse_quote! {
5011 #counter_ident = #input_ident -> _counter(#arg, #duration);
5012 },
5013 None,
5014 Some(&stmt_id.to_string()),
5015 );
5016 }
5017 BuildersOrCallback::Callback(_, node_callback) => {
5018 node_callback(node, next_stmt_id);
5019 }
5020 }
5021
5022 ident_stack.push(counter_ident);
5023 }
5024 }
5025 },
5026 seen_tees,
5027 false,
5028 );
5029
5030 let ret = ident_stack
5031 .pop()
5032 .expect("ident_stack should have exactly one element after traversal");
5033 assert!(
5034 ident_stack.is_empty(),
5035 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5036 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5037 ident_stack.len()
5038 );
5039 ret
5040 }
5041
5042 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5043 match self {
5044 HydroNode::Placeholder => {
5045 panic!()
5046 }
5047 HydroNode::Cast { .. }
5048 | HydroNode::ObserveNonDet { .. }
5049 | HydroNode::UnboundSingleton { .. }
5050 | HydroNode::AssertIsConsistent { .. } => {}
5051 HydroNode::Source { source, .. } => match source {
5052 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5053 HydroSource::ExternalNetwork()
5054 | HydroSource::Spin()
5055 | HydroSource::ClusterMembers(_, _)
5056 | HydroSource::Embedded(_)
5057 | HydroSource::EmbeddedSingleton(_) => {} },
5059 HydroNode::SingletonSource { value, .. } => {
5060 transform(value);
5061 }
5062 HydroNode::CycleSource { .. }
5063 | HydroNode::Tee { .. }
5064 | HydroNode::Reference { .. }
5065 | HydroNode::YieldConcat { .. }
5066 | HydroNode::BeginAtomic { .. }
5067 | HydroNode::EndAtomic { .. }
5068 | HydroNode::Batch { .. }
5069 | HydroNode::Chain { .. }
5070 | HydroNode::MergeOrdered { .. }
5071 | HydroNode::ChainFirst { .. }
5072 | HydroNode::CrossProduct { .. }
5073 | HydroNode::CrossSingleton { .. }
5074 | HydroNode::ResolveFutures { .. }
5075 | HydroNode::ResolveFuturesBlocking { .. }
5076 | HydroNode::ResolveFuturesOrdered { .. }
5077 | HydroNode::Join { .. }
5078 | HydroNode::JoinHalf { .. }
5079 | HydroNode::Difference { .. }
5080 | HydroNode::AntiJoin { .. }
5081 | HydroNode::DeferTick { .. }
5082 | HydroNode::Enumerate { .. }
5083 | HydroNode::Unique { .. }
5084 | HydroNode::Sort { .. } => {}
5085 HydroNode::Map { f, .. }
5086 | HydroNode::FlatMap { f, .. }
5087 | HydroNode::FlatMapStreamBlocking { f, .. }
5088 | HydroNode::Filter { f, .. }
5089 | HydroNode::FilterMap { f, .. }
5090 | HydroNode::Inspect { f, .. }
5091 | HydroNode::Partition { f, .. }
5092 | HydroNode::Reduce { f, .. }
5093 | HydroNode::ReduceKeyed { f, .. }
5094 | HydroNode::ReduceKeyedWatermark { f, .. } => {
5095 transform(&mut f.expr);
5096 }
5097 HydroNode::Fold { init, acc, .. }
5098 | HydroNode::Scan { init, acc, .. }
5099 | HydroNode::ScanAsyncBlocking { init, acc, .. }
5100 | HydroNode::FoldKeyed { init, acc, .. } => {
5101 transform(&mut init.expr);
5102 transform(&mut acc.expr);
5103 }
5104 HydroNode::Network {
5105 serialize_fn,
5106 deserialize_fn,
5107 ..
5108 } => {
5109 if let Some(serialize_fn) = serialize_fn {
5110 transform(serialize_fn);
5111 }
5112 if let Some(deserialize_fn) = deserialize_fn {
5113 transform(deserialize_fn);
5114 }
5115 }
5116 HydroNode::ExternalInput { deserialize_fn, .. } => {
5117 if let Some(deserialize_fn) = deserialize_fn {
5118 transform(deserialize_fn);
5119 }
5120 }
5121 HydroNode::Counter { duration, .. } => {
5122 transform(duration);
5123 }
5124 }
5125 }
5126
5127 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5128 &self.metadata().op
5129 }
5130
5131 pub fn metadata(&self) -> &HydroIrMetadata {
5132 match self {
5133 HydroNode::Placeholder => {
5134 panic!()
5135 }
5136 HydroNode::Cast { metadata, .. }
5137 | HydroNode::ObserveNonDet { metadata, .. }
5138 | HydroNode::AssertIsConsistent { metadata, .. }
5139 | HydroNode::UnboundSingleton { metadata, .. }
5140 | HydroNode::Source { metadata, .. }
5141 | HydroNode::SingletonSource { metadata, .. }
5142 | HydroNode::CycleSource { metadata, .. }
5143 | HydroNode::Tee { metadata, .. }
5144 | HydroNode::Reference { metadata, .. }
5145 | HydroNode::Partition { metadata, .. }
5146 | HydroNode::YieldConcat { metadata, .. }
5147 | HydroNode::BeginAtomic { metadata, .. }
5148 | HydroNode::EndAtomic { metadata, .. }
5149 | HydroNode::Batch { metadata, .. }
5150 | HydroNode::Chain { metadata, .. }
5151 | HydroNode::MergeOrdered { metadata, .. }
5152 | HydroNode::ChainFirst { metadata, .. }
5153 | HydroNode::CrossProduct { metadata, .. }
5154 | HydroNode::CrossSingleton { metadata, .. }
5155 | HydroNode::Join { metadata, .. }
5156 | HydroNode::JoinHalf { metadata, .. }
5157 | HydroNode::Difference { metadata, .. }
5158 | HydroNode::AntiJoin { metadata, .. }
5159 | HydroNode::ResolveFutures { metadata, .. }
5160 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5161 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5162 | HydroNode::Map { metadata, .. }
5163 | HydroNode::FlatMap { metadata, .. }
5164 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5165 | HydroNode::Filter { metadata, .. }
5166 | HydroNode::FilterMap { metadata, .. }
5167 | HydroNode::DeferTick { metadata, .. }
5168 | HydroNode::Enumerate { metadata, .. }
5169 | HydroNode::Inspect { metadata, .. }
5170 | HydroNode::Unique { metadata, .. }
5171 | HydroNode::Sort { metadata, .. }
5172 | HydroNode::Scan { metadata, .. }
5173 | HydroNode::ScanAsyncBlocking { metadata, .. }
5174 | HydroNode::Fold { metadata, .. }
5175 | HydroNode::FoldKeyed { metadata, .. }
5176 | HydroNode::Reduce { metadata, .. }
5177 | HydroNode::ReduceKeyed { metadata, .. }
5178 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5179 | HydroNode::ExternalInput { metadata, .. }
5180 | HydroNode::Network { metadata, .. }
5181 | HydroNode::Counter { metadata, .. } => metadata,
5182 }
5183 }
5184
5185 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5186 &mut self.metadata_mut().op
5187 }
5188
5189 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5190 match self {
5191 HydroNode::Placeholder => {
5192 panic!()
5193 }
5194 HydroNode::Cast { metadata, .. }
5195 | HydroNode::ObserveNonDet { metadata, .. }
5196 | HydroNode::AssertIsConsistent { metadata, .. }
5197 | HydroNode::UnboundSingleton { metadata, .. }
5198 | HydroNode::Source { metadata, .. }
5199 | HydroNode::SingletonSource { metadata, .. }
5200 | HydroNode::CycleSource { metadata, .. }
5201 | HydroNode::Tee { metadata, .. }
5202 | HydroNode::Reference { metadata, .. }
5203 | HydroNode::Partition { metadata, .. }
5204 | HydroNode::YieldConcat { metadata, .. }
5205 | HydroNode::BeginAtomic { metadata, .. }
5206 | HydroNode::EndAtomic { metadata, .. }
5207 | HydroNode::Batch { metadata, .. }
5208 | HydroNode::Chain { metadata, .. }
5209 | HydroNode::MergeOrdered { metadata, .. }
5210 | HydroNode::ChainFirst { metadata, .. }
5211 | HydroNode::CrossProduct { metadata, .. }
5212 | HydroNode::CrossSingleton { metadata, .. }
5213 | HydroNode::Join { metadata, .. }
5214 | HydroNode::JoinHalf { metadata, .. }
5215 | HydroNode::Difference { metadata, .. }
5216 | HydroNode::AntiJoin { metadata, .. }
5217 | HydroNode::ResolveFutures { metadata, .. }
5218 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5219 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5220 | HydroNode::Map { metadata, .. }
5221 | HydroNode::FlatMap { metadata, .. }
5222 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5223 | HydroNode::Filter { metadata, .. }
5224 | HydroNode::FilterMap { metadata, .. }
5225 | HydroNode::DeferTick { metadata, .. }
5226 | HydroNode::Enumerate { metadata, .. }
5227 | HydroNode::Inspect { metadata, .. }
5228 | HydroNode::Unique { metadata, .. }
5229 | HydroNode::Sort { metadata, .. }
5230 | HydroNode::Scan { metadata, .. }
5231 | HydroNode::ScanAsyncBlocking { metadata, .. }
5232 | HydroNode::Fold { metadata, .. }
5233 | HydroNode::FoldKeyed { metadata, .. }
5234 | HydroNode::Reduce { metadata, .. }
5235 | HydroNode::ReduceKeyed { metadata, .. }
5236 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5237 | HydroNode::ExternalInput { metadata, .. }
5238 | HydroNode::Network { metadata, .. }
5239 | HydroNode::Counter { metadata, .. } => metadata,
5240 }
5241 }
5242
5243 pub fn input(&self) -> Vec<&HydroNode> {
5244 match self {
5245 HydroNode::Placeholder => {
5246 panic!()
5247 }
5248 HydroNode::Source { .. }
5249 | HydroNode::SingletonSource { .. }
5250 | HydroNode::ExternalInput { .. }
5251 | HydroNode::CycleSource { .. }
5252 | HydroNode::Tee { .. }
5253 | HydroNode::Reference { .. }
5254 | HydroNode::Partition { .. } => {
5255 vec![]
5257 }
5258 HydroNode::Cast { inner, .. }
5259 | HydroNode::ObserveNonDet { inner, .. }
5260 | HydroNode::YieldConcat { inner, .. }
5261 | HydroNode::BeginAtomic { inner, .. }
5262 | HydroNode::EndAtomic { inner, .. }
5263 | HydroNode::Batch { inner, .. }
5264 | HydroNode::UnboundSingleton { inner, .. }
5265 | HydroNode::AssertIsConsistent { inner, .. } => {
5266 vec![inner]
5267 }
5268 HydroNode::Chain { first, second, .. } => {
5269 vec![first, second]
5270 }
5271 HydroNode::MergeOrdered { first, second, .. } => {
5272 vec![first, second]
5273 }
5274 HydroNode::ChainFirst { first, second, .. } => {
5275 vec![first, second]
5276 }
5277 HydroNode::CrossProduct { left, right, .. }
5278 | HydroNode::CrossSingleton { left, right, .. }
5279 | HydroNode::Join { left, right, .. }
5280 | HydroNode::JoinHalf { left, right, .. } => {
5281 vec![left, right]
5282 }
5283 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5284 vec![pos, neg]
5285 }
5286 HydroNode::Map { input, .. }
5287 | HydroNode::FlatMap { input, .. }
5288 | HydroNode::FlatMapStreamBlocking { input, .. }
5289 | HydroNode::Filter { input, .. }
5290 | HydroNode::FilterMap { input, .. }
5291 | HydroNode::Sort { input, .. }
5292 | HydroNode::DeferTick { input, .. }
5293 | HydroNode::Enumerate { input, .. }
5294 | HydroNode::Inspect { input, .. }
5295 | HydroNode::Unique { input, .. }
5296 | HydroNode::Network { input, .. }
5297 | HydroNode::Counter { input, .. }
5298 | HydroNode::ResolveFutures { input, .. }
5299 | HydroNode::ResolveFuturesBlocking { input, .. }
5300 | HydroNode::ResolveFuturesOrdered { input, .. }
5301 | HydroNode::Fold { input, .. }
5302 | HydroNode::FoldKeyed { input, .. }
5303 | HydroNode::Reduce { input, .. }
5304 | HydroNode::ReduceKeyed { input, .. }
5305 | HydroNode::Scan { input, .. }
5306 | HydroNode::ScanAsyncBlocking { input, .. } => {
5307 vec![input]
5308 }
5309 HydroNode::ReduceKeyedWatermark {
5310 input, watermark, ..
5311 } => {
5312 vec![input, watermark]
5313 }
5314 }
5315 }
5316
5317 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5318 self.input()
5319 .iter()
5320 .map(|input_node| input_node.metadata())
5321 .collect()
5322 }
5323
5324 pub fn is_shared_with_others(&self) -> bool {
5328 match self {
5329 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5330 Rc::strong_count(&inner.0) > 1
5331 }
5332 HydroNode::Reference { .. } => false,
5335 _ => false,
5336 }
5337 }
5338
5339 pub fn print_root(&self) -> String {
5340 match self {
5341 HydroNode::Placeholder => {
5342 panic!()
5343 }
5344 HydroNode::Cast { .. } => "Cast()".to_owned(),
5345 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5346 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5347 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5348 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5349 HydroNode::SingletonSource {
5350 value,
5351 first_tick_only,
5352 ..
5353 } => format!(
5354 "SingletonSource({:?}, first_tick_only={})",
5355 value, first_tick_only
5356 ),
5357 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5358 HydroNode::Tee { inner, .. } => {
5359 format!("Tee({})", inner.0.borrow().print_root())
5360 }
5361 HydroNode::Reference { inner, kind, .. } => {
5362 format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5363 }
5364 HydroNode::Partition { f, is_true, .. } => {
5365 format!("Partition({:?}, is_true={})", f, is_true)
5366 }
5367 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5368 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5369 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5370 HydroNode::Batch { .. } => "Batch()".to_owned(),
5371 HydroNode::Chain { first, second, .. } => {
5372 format!("Chain({}, {})", first.print_root(), second.print_root())
5373 }
5374 HydroNode::MergeOrdered { first, second, .. } => {
5375 format!(
5376 "MergeOrdered({}, {})",
5377 first.print_root(),
5378 second.print_root()
5379 )
5380 }
5381 HydroNode::ChainFirst { first, second, .. } => {
5382 format!(
5383 "ChainFirst({}, {})",
5384 first.print_root(),
5385 second.print_root()
5386 )
5387 }
5388 HydroNode::CrossProduct { left, right, .. } => {
5389 format!(
5390 "CrossProduct({}, {})",
5391 left.print_root(),
5392 right.print_root()
5393 )
5394 }
5395 HydroNode::CrossSingleton { left, right, .. } => {
5396 format!(
5397 "CrossSingleton({}, {})",
5398 left.print_root(),
5399 right.print_root()
5400 )
5401 }
5402 HydroNode::Join { left, right, .. } => {
5403 format!("Join({}, {})", left.print_root(), right.print_root())
5404 }
5405 HydroNode::JoinHalf { left, right, .. } => {
5406 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5407 }
5408 HydroNode::Difference { pos, neg, .. } => {
5409 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5410 }
5411 HydroNode::AntiJoin { pos, neg, .. } => {
5412 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5413 }
5414 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5415 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5416 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5417 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5418 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5419 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5420 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5421 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5422 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5423 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5424 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5425 HydroNode::Unique { .. } => "Unique()".to_owned(),
5426 HydroNode::Sort { .. } => "Sort()".to_owned(),
5427 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5428 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5429 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5430 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5431 }
5432 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5433 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5434 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5435 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5436 HydroNode::Network { .. } => "Network()".to_owned(),
5437 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5438 HydroNode::Counter { tag, duration, .. } => {
5439 format!("Counter({:?}, {:?})", tag, duration)
5440 }
5441 }
5442 }
5443}
5444
5445#[cfg(feature = "build")]
5446fn instantiate_network<'a, D>(
5447 env: &mut D::InstantiateEnv,
5448 from_location: &LocationId,
5449 to_location: &LocationId,
5450 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5451 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5452 name: Option<&str>,
5453 networking_info: &crate::networking::NetworkingInfo,
5454) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5455where
5456 D: Deploy<'a>,
5457{
5458 let ((sink, source), connect_fn) = match (from_location, to_location) {
5459 (&LocationId::Process(from), &LocationId::Process(to)) => {
5460 let from_node = processes
5461 .get(from)
5462 .unwrap_or_else(|| {
5463 panic!("A process used in the graph was not instantiated: {}", from)
5464 })
5465 .clone();
5466 let to_node = processes
5467 .get(to)
5468 .unwrap_or_else(|| {
5469 panic!("A process used in the graph was not instantiated: {}", to)
5470 })
5471 .clone();
5472
5473 let sink_port = from_node.next_port();
5474 let source_port = to_node.next_port();
5475
5476 (
5477 D::o2o_sink_source(
5478 env,
5479 &from_node,
5480 &sink_port,
5481 &to_node,
5482 &source_port,
5483 name,
5484 networking_info,
5485 ),
5486 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5487 )
5488 }
5489 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5490 let from_node = processes
5491 .get(from)
5492 .unwrap_or_else(|| {
5493 panic!("A process used in the graph was not instantiated: {}", from)
5494 })
5495 .clone();
5496 let to_node = clusters
5497 .get(to)
5498 .unwrap_or_else(|| {
5499 panic!("A cluster used in the graph was not instantiated: {}", to)
5500 })
5501 .clone();
5502
5503 let sink_port = from_node.next_port();
5504 let source_port = to_node.next_port();
5505
5506 (
5507 D::o2m_sink_source(
5508 env,
5509 &from_node,
5510 &sink_port,
5511 &to_node,
5512 &source_port,
5513 name,
5514 networking_info,
5515 ),
5516 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5517 )
5518 }
5519 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5520 let from_node = clusters
5521 .get(from)
5522 .unwrap_or_else(|| {
5523 panic!("A cluster used in the graph was not instantiated: {}", from)
5524 })
5525 .clone();
5526 let to_node = processes
5527 .get(to)
5528 .unwrap_or_else(|| {
5529 panic!("A process used in the graph was not instantiated: {}", to)
5530 })
5531 .clone();
5532
5533 let sink_port = from_node.next_port();
5534 let source_port = to_node.next_port();
5535
5536 (
5537 D::m2o_sink_source(
5538 env,
5539 &from_node,
5540 &sink_port,
5541 &to_node,
5542 &source_port,
5543 name,
5544 networking_info,
5545 ),
5546 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5547 )
5548 }
5549 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5550 let from_node = clusters
5551 .get(from)
5552 .unwrap_or_else(|| {
5553 panic!("A cluster used in the graph was not instantiated: {}", from)
5554 })
5555 .clone();
5556 let to_node = clusters
5557 .get(to)
5558 .unwrap_or_else(|| {
5559 panic!("A cluster used in the graph was not instantiated: {}", to)
5560 })
5561 .clone();
5562
5563 let sink_port = from_node.next_port();
5564 let source_port = to_node.next_port();
5565
5566 (
5567 D::m2m_sink_source(
5568 env,
5569 &from_node,
5570 &sink_port,
5571 &to_node,
5572 &source_port,
5573 name,
5574 networking_info,
5575 ),
5576 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5577 )
5578 }
5579 (LocationId::Tick(_, _), _) => panic!(),
5580 (_, LocationId::Tick(_, _)) => panic!(),
5581 (LocationId::Atomic(_), _) => panic!(),
5582 (_, LocationId::Atomic(_)) => panic!(),
5583 };
5584 (sink, source, connect_fn)
5585}
5586
5587#[cfg(test)]
5588mod serde_test;
5589
5590#[cfg(test)]
5591mod test {
5592 use std::mem::size_of;
5593
5594 use stageleft::{QuotedWithContext, q};
5595
5596 use super::*;
5597
5598 #[test]
5599 #[cfg_attr(
5600 not(feature = "build"),
5601 ignore = "expects inclusion of feature-gated fields"
5602 )]
5603 fn hydro_node_size() {
5604 assert_eq!(size_of::<HydroNode>(), 264);
5605 }
5606
5607 #[test]
5608 #[cfg_attr(
5609 not(feature = "build"),
5610 ignore = "expects inclusion of feature-gated fields"
5611 )]
5612 fn hydro_root_size() {
5613 assert_eq!(size_of::<HydroRoot>(), 136);
5614 }
5615
5616 #[test]
5617 fn test_simplify_q_macro_basic() {
5618 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5620 let result = simplify_q_macro(simple_expr.clone());
5621 assert_eq!(result, simple_expr);
5622 }
5623
5624 #[test]
5625 fn test_simplify_q_macro_actual_stageleft_call() {
5626 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5628 let result = simplify_q_macro(stageleft_call);
5629 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5632 }
5633
5634 #[test]
5635 fn test_closure_no_pipe_at_start() {
5636 let stageleft_call = q!({
5638 let foo = 123;
5639 move |b: usize| b + foo
5640 })
5641 .splice_fn1_ctx(&());
5642 let result = simplify_q_macro(stageleft_call);
5643 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5644 }
5645}