1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::{Display, Write};
4use std::num::ParseIntError;
5use std::sync::OnceLock;
6
7use auto_impl::auto_impl;
8use slotmap::{Key, SecondaryMap, SlotMap};
9
10pub use super::graphviz::{HydroDot, escape_dot};
11pub use super::json::HydroJson;
12pub use super::mermaid::{HydroMermaid, escape_mermaid};
14use crate::compile::ir::backtrace::Backtrace;
15use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
16use crate::location::dynamic::LocationId;
17use crate::location::{LocationKey, LocationType};
18
19#[derive(Debug, Clone)]
21pub enum NodeLabel {
22 Static(String),
24 WithExprs {
26 op_name: String,
27 exprs: Vec<DebugExpr>,
28 },
29}
30
31impl NodeLabel {
32 pub fn static_label(s: String) -> Self {
34 Self::Static(s)
35 }
36
37 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
39 Self::WithExprs { op_name, exprs }
40 }
41}
42
43impl Display for NodeLabel {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 Self::Static(s) => write!(f, "{}", s),
47 Self::WithExprs { op_name, exprs } => {
48 if exprs.is_empty() {
49 write!(f, "{}()", op_name)
50 } else {
51 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
52 write!(f, "{}({})", op_name, expr_strs.join(", "))
53 }
54 }
55 }
56 }
57}
58
59pub struct IndentedGraphWriter<'a, W> {
62 pub write: W,
63 pub indent: usize,
64 pub config: HydroWriteConfig<'a>,
65}
66
67impl<'a, W> IndentedGraphWriter<'a, W> {
68 pub fn new(write: W) -> Self {
70 Self {
71 write,
72 indent: 0,
73 config: HydroWriteConfig::default(),
74 }
75 }
76
77 pub fn new_with_config(write: W, config: HydroWriteConfig<'a>) -> Self {
79 Self {
80 write,
81 indent: 0,
82 config,
83 }
84 }
85}
86
87impl<W: Write> IndentedGraphWriter<'_, W> {
88 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
90 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
91 }
92}
93
94pub type GraphWriteError = std::fmt::Error;
96
97#[auto_impl(&mut, Box)]
99pub trait HydroGraphWrite {
100 type Err: Error;
102
103 fn write_prologue(&mut self) -> Result<(), Self::Err>;
105
106 fn write_node_definition(
108 &mut self,
109 node_id: VizNodeKey,
110 node_label: &NodeLabel,
111 node_type: HydroNodeType,
112 location_key: Option<LocationKey>,
113 location_type: Option<LocationType>,
114 backtrace: Option<&Backtrace>,
115 ) -> Result<(), Self::Err>;
116
117 fn write_edge(
119 &mut self,
120 src_id: VizNodeKey,
121 dst_id: VizNodeKey,
122 edge_properties: &HashSet<HydroEdgeProp>,
123 label: Option<&str>,
124 ) -> Result<(), Self::Err>;
125
126 fn write_location_start(
128 &mut self,
129 location_key: LocationKey,
130 location_type: LocationType,
131 ) -> Result<(), Self::Err>;
132
133 fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
135
136 fn write_location_end(&mut self) -> Result<(), Self::Err>;
138
139 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
141}
142
143pub mod node_type_utils {
145 use super::HydroNodeType;
146
147 const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
149 (HydroNodeType::Source, "Source"),
150 (HydroNodeType::Transform, "Transform"),
151 (HydroNodeType::Join, "Join"),
152 (HydroNodeType::Aggregation, "Aggregation"),
153 (HydroNodeType::Network, "Network"),
154 (HydroNodeType::Sink, "Sink"),
155 (HydroNodeType::Tee, "Tee"),
156 (HydroNodeType::NonDeterministic, "NonDeterministic"),
157 ];
158
159 pub fn to_string(node_type: HydroNodeType) -> &'static str {
161 NODE_TYPE_DATA
162 .iter()
163 .find(|(nt, _)| *nt == node_type)
164 .map(|(_, name)| *name)
165 .unwrap_or("Unknown")
166 }
167
168 pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
170 NODE_TYPE_DATA.to_vec()
171 }
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum HydroNodeType {
177 Source,
178 Transform,
179 Join,
180 Aggregation,
181 Network,
182 Sink,
183 Tee,
184 NonDeterministic,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
189pub enum HydroEdgeProp {
190 Bounded,
191 Unbounded,
192 TotalOrder,
193 NoOrder,
194 Keyed,
195 Stream,
197 KeyedSingleton,
198 KeyedStream,
199 Singleton,
200 Optional,
201 Network,
202 Cycle,
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct UnifiedEdgeStyle {
209 pub line_pattern: LinePattern,
211 pub line_width: u8,
213 pub arrowhead: ArrowheadStyle,
215 pub line_style: LineStyle,
217 pub halo: HaloStyle,
219 pub waviness: WavinessStyle,
221 pub animation: AnimationStyle,
223 pub color: &'static str,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum LinePattern {
229 Solid,
230 Dotted,
231 Dashed,
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum ArrowheadStyle {
236 TriangleFilled,
237 CircleFilled,
238 DiamondOpen,
239 Default,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243pub enum LineStyle {
244 Single,
246 HashMarks,
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum HaloStyle {
252 None,
253 LightBlue,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum WavinessStyle {
258 None,
259 Wavy,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum AnimationStyle {
264 Static,
265 Animated,
266}
267
268impl Default for UnifiedEdgeStyle {
269 fn default() -> Self {
270 Self {
271 line_pattern: LinePattern::Solid,
272 line_width: 1,
273 arrowhead: ArrowheadStyle::Default,
274 line_style: LineStyle::Single,
275 halo: HaloStyle::None,
276 waviness: WavinessStyle::None,
277 animation: AnimationStyle::Static,
278 color: "#666666",
279 }
280 }
281}
282
283pub fn get_unified_edge_style(
296 edge_properties: &HashSet<HydroEdgeProp>,
297 src_location: Option<usize>,
298 dst_location: Option<usize>,
299) -> UnifiedEdgeStyle {
300 let mut style = UnifiedEdgeStyle::default();
301
302 let is_network = edge_properties.contains(&HydroEdgeProp::Network)
304 || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
305
306 if is_network {
307 style.line_pattern = LinePattern::Dashed;
308 style.animation = AnimationStyle::Animated;
309 } else {
310 style.line_pattern = LinePattern::Solid;
311 style.animation = AnimationStyle::Static;
312 }
313
314 if edge_properties.contains(&HydroEdgeProp::Unbounded) {
316 style.halo = HaloStyle::LightBlue;
317 } else {
318 style.halo = HaloStyle::None;
319 }
320
321 if edge_properties.contains(&HydroEdgeProp::Stream) {
323 style.arrowhead = ArrowheadStyle::TriangleFilled;
324 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
326 style.arrowhead = ArrowheadStyle::TriangleFilled;
327 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
329 style.arrowhead = ArrowheadStyle::TriangleFilled;
330 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
332 style.arrowhead = ArrowheadStyle::CircleFilled;
333 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Optional) {
335 style.arrowhead = ArrowheadStyle::DiamondOpen;
336 style.color = "#6b7280"; }
338
339 if edge_properties.contains(&HydroEdgeProp::Keyed) {
341 style.line_style = LineStyle::HashMarks; } else {
343 style.line_style = LineStyle::Single;
344 }
345
346 if edge_properties.contains(&HydroEdgeProp::NoOrder) {
348 style.waviness = WavinessStyle::Wavy;
349 } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
350 style.waviness = WavinessStyle::None;
351 }
352
353 style
354}
355
356pub fn extract_edge_properties_from_collection_kind(
360 collection_kind: &crate::compile::ir::CollectionKind,
361) -> HashSet<HydroEdgeProp> {
362 use crate::compile::ir::CollectionKind;
363
364 let mut properties = HashSet::new();
365
366 match collection_kind {
367 CollectionKind::Stream { bound, order, .. } => {
368 properties.insert(HydroEdgeProp::Stream);
369 add_bound_property(&mut properties, bound);
370 add_order_property(&mut properties, order);
371 }
372 CollectionKind::KeyedStream {
373 bound, value_order, ..
374 } => {
375 properties.insert(HydroEdgeProp::KeyedStream);
376 properties.insert(HydroEdgeProp::Keyed);
377 add_bound_property(&mut properties, bound);
378 add_order_property(&mut properties, value_order);
379 }
380 CollectionKind::Singleton { bound, .. } => {
381 properties.insert(HydroEdgeProp::Singleton);
382 add_singleton_bound_property(&mut properties, bound);
383 properties.insert(HydroEdgeProp::TotalOrder);
385 }
386 CollectionKind::Optional { bound, .. } => {
387 properties.insert(HydroEdgeProp::Optional);
388 add_bound_property(&mut properties, bound);
389 properties.insert(HydroEdgeProp::TotalOrder);
391 }
392 CollectionKind::KeyedSingleton { bound, .. } => {
393 properties.insert(HydroEdgeProp::Singleton);
394 properties.insert(HydroEdgeProp::Keyed);
395 add_keyed_singleton_bound_property(&mut properties, bound);
397 properties.insert(HydroEdgeProp::TotalOrder);
398 }
399 }
400
401 properties
402}
403
404fn add_bound_property(
406 properties: &mut HashSet<HydroEdgeProp>,
407 bound: &crate::compile::ir::BoundKind,
408) {
409 use crate::compile::ir::BoundKind;
410
411 match bound {
412 BoundKind::Bounded => {
413 properties.insert(HydroEdgeProp::Bounded);
414 }
415 BoundKind::Unbounded => {
416 properties.insert(HydroEdgeProp::Unbounded);
417 }
418 }
419}
420
421fn add_singleton_bound_property(
423 properties: &mut HashSet<HydroEdgeProp>,
424 bound: &crate::compile::ir::SingletonBoundKind,
425) {
426 use crate::compile::ir::SingletonBoundKind;
427
428 match bound {
429 SingletonBoundKind::Bounded => {
430 properties.insert(HydroEdgeProp::Bounded);
431 }
432 SingletonBoundKind::Monotonic | SingletonBoundKind::Unbounded => {
433 properties.insert(HydroEdgeProp::Unbounded);
434 }
435 }
436}
437
438fn add_keyed_singleton_bound_property(
440 properties: &mut HashSet<HydroEdgeProp>,
441 bound: &crate::compile::ir::KeyedSingletonBoundKind,
442) {
443 use crate::compile::ir::KeyedSingletonBoundKind;
444
445 match bound {
446 KeyedSingletonBoundKind::Bounded => {
447 properties.insert(HydroEdgeProp::Bounded);
448 }
449 KeyedSingletonBoundKind::BoundedValue
450 | KeyedSingletonBoundKind::MonotonicKeys
451 | KeyedSingletonBoundKind::MonotonicValue
452 | KeyedSingletonBoundKind::Unbounded => {
453 properties.insert(HydroEdgeProp::Unbounded);
454 }
455 }
456}
457
458fn add_order_property(
460 properties: &mut HashSet<HydroEdgeProp>,
461 order: &crate::compile::ir::StreamOrder,
462) {
463 use crate::compile::ir::StreamOrder;
464
465 match order {
466 StreamOrder::TotalOrder => {
467 properties.insert(HydroEdgeProp::TotalOrder);
468 }
469 StreamOrder::NoOrder => {
470 properties.insert(HydroEdgeProp::NoOrder);
471 }
472 }
473}
474
475pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
478 src_location.root() != dst_location.root()
480}
481
482pub fn add_network_edge_tag(
484 properties: &mut HashSet<HydroEdgeProp>,
485 src_location: &LocationId,
486 dst_location: &LocationId,
487) {
488 if is_network_edge(src_location, dst_location) {
489 properties.insert(HydroEdgeProp::Network);
490 }
491}
492
493#[derive(Debug, Clone, Copy)]
495pub struct HydroWriteConfig<'a> {
496 pub show_metadata: bool,
497 pub show_location_groups: bool,
498 pub use_short_labels: bool,
499 pub location_names: &'a SecondaryMap<LocationKey, String>,
500}
501
502impl Default for HydroWriteConfig<'_> {
503 fn default() -> Self {
504 static EMPTY: OnceLock<SecondaryMap<LocationKey, String>> = OnceLock::new();
505 Self {
506 show_metadata: false,
507 show_location_groups: true,
508 use_short_labels: true, location_names: EMPTY.get_or_init(SecondaryMap::new),
510 }
511 }
512}
513
514#[derive(Clone)]
516pub struct HydroGraphNode {
517 pub label: NodeLabel,
518 pub node_type: HydroNodeType,
519 pub location_key: Option<LocationKey>,
520 pub backtrace: Option<Backtrace>,
521}
522
523slotmap::new_key_type! {
524 pub struct VizNodeKey;
528}
529
530impl Display for VizNodeKey {
531 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
532 write!(f, "viz{:?}", self.data()) }
534}
535
536impl std::str::FromStr for VizNodeKey {
539 type Err = Option<ParseIntError>;
540
541 fn from_str(s: &str) -> Result<Self, Self::Err> {
542 let nvn = s.strip_prefix("viz").ok_or(None)?;
543 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
544 let idx: u64 = idx.parse()?;
545 let ver: u64 = ver.parse()?;
546 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
547 }
548}
549
550impl VizNodeKey {
551 #[cfg(test)]
553 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x0000008F00000001)); #[cfg(test)]
557 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x0000008F00000002)); }
559
560#[derive(Debug, Clone)]
562pub struct HydroGraphEdge {
563 pub src: VizNodeKey,
564 pub dst: VizNodeKey,
565 pub edge_properties: HashSet<HydroEdgeProp>,
566 pub label: Option<String>,
567}
568
569#[derive(Default)]
571pub struct HydroGraphStructure {
572 pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
573 pub edges: Vec<HydroGraphEdge>,
574 pub locations: SecondaryMap<LocationKey, LocationType>,
575}
576
577impl HydroGraphStructure {
578 pub fn new() -> Self {
579 Self::default()
580 }
581
582 pub fn add_node(
583 &mut self,
584 label: NodeLabel,
585 node_type: HydroNodeType,
586 location_key: Option<LocationKey>,
587 ) -> VizNodeKey {
588 self.add_node_with_backtrace(label, node_type, location_key, None)
589 }
590
591 pub fn add_node_with_backtrace(
592 &mut self,
593 label: NodeLabel,
594 node_type: HydroNodeType,
595 location_key: Option<LocationKey>,
596 backtrace: Option<Backtrace>,
597 ) -> VizNodeKey {
598 self.nodes.insert(HydroGraphNode {
599 label,
600 node_type,
601 location_key,
602 backtrace,
603 })
604 }
605
606 pub fn add_node_with_metadata(
608 &mut self,
609 label: NodeLabel,
610 node_type: HydroNodeType,
611 metadata: &HydroIrMetadata,
612 ) -> VizNodeKey {
613 let location_key = Some(setup_location(self, metadata));
614 let backtrace = Some(metadata.op.backtrace.clone());
615 self.add_node_with_backtrace(label, node_type, location_key, backtrace)
616 }
617
618 pub fn add_edge(
619 &mut self,
620 src: VizNodeKey,
621 dst: VizNodeKey,
622 edge_properties: HashSet<HydroEdgeProp>,
623 label: Option<String>,
624 ) {
625 self.edges.push(HydroGraphEdge {
626 src,
627 dst,
628 edge_properties,
629 label,
630 });
631 }
632
633 pub fn add_edge_single(
635 &mut self,
636 src: VizNodeKey,
637 dst: VizNodeKey,
638 edge_type: HydroEdgeProp,
639 label: Option<String>,
640 ) {
641 let mut properties = HashSet::new();
642 properties.insert(edge_type);
643 self.edges.push(HydroGraphEdge {
644 src,
645 dst,
646 edge_properties: properties,
647 label,
648 });
649 }
650
651 pub fn add_location(&mut self, location_key: LocationKey, location_type: LocationType) {
652 self.locations.insert(location_key, location_type);
653 }
654}
655
656pub fn extract_op_name(full_label: String) -> String {
658 full_label
659 .split('(')
660 .next()
661 .unwrap_or("unknown")
662 .to_lowercase()
663}
664
665pub fn extract_short_label(full_label: &str) -> String {
667 if let Some(op_name) = full_label.split('(').next() {
669 let base_name = op_name.to_lowercase();
670 match base_name.as_str() {
671 "source" => {
673 if full_label.contains("Iter") {
674 "source_iter".to_owned()
675 } else if full_label.contains("Stream") {
676 "source_stream".to_owned()
677 } else if full_label.contains("ExternalNetwork") {
678 "external_network".to_owned()
679 } else if full_label.contains("Spin") {
680 "spin".to_owned()
681 } else {
682 "source".to_owned()
683 }
684 }
685 "network" => {
686 if full_label.contains("deser") {
687 "network(recv)".to_owned()
688 } else if full_label.contains("ser") {
689 "network(send)".to_owned()
690 } else {
691 "network".to_owned()
692 }
693 }
694 _ => base_name,
696 }
697 } else {
698 if full_label.len() > 20 {
700 format!("{}...", &full_label[..17])
701 } else {
702 full_label.to_owned()
703 }
704 }
705}
706
707fn setup_location(structure: &mut HydroGraphStructure, metadata: &HydroIrMetadata) -> LocationKey {
709 let root = metadata.location_id.root();
710 let location_key = root.key();
711 let location_type = root.location_type().unwrap();
712 structure.add_location(location_key, location_type);
713 location_key
714}
715
716fn add_edge_with_metadata(
719 structure: &mut HydroGraphStructure,
720 src_id: VizNodeKey,
721 dst_id: VizNodeKey,
722 src_metadata: Option<&HydroIrMetadata>,
723 dst_metadata: Option<&HydroIrMetadata>,
724 label: Option<String>,
725) {
726 let mut properties = HashSet::new();
727
728 if let Some(metadata) = src_metadata {
730 properties.extend(extract_edge_properties_from_collection_kind(
731 &metadata.collection_kind,
732 ));
733 }
734
735 if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
737 add_network_edge_tag(
738 &mut properties,
739 &src_meta.location_id,
740 &dst_meta.location_id,
741 );
742 }
743
744 if properties.is_empty() {
746 properties.insert(HydroEdgeProp::Stream);
747 }
748
749 structure.add_edge(src_id, dst_id, properties, label);
750}
751
752fn write_graph_structure<W>(
754 structure: &HydroGraphStructure,
755 graph_write: W,
756 config: HydroWriteConfig<'_>,
757) -> Result<(), W::Err>
758where
759 W: HydroGraphWrite,
760{
761 let mut graph_write = graph_write;
762 graph_write.write_prologue()?;
764
765 for (node_id, node) in structure.nodes.iter() {
767 let location_type = node
768 .location_key
769 .and_then(|loc_key| structure.locations.get(loc_key))
770 .copied();
771
772 graph_write.write_node_definition(
773 node_id,
774 &node.label,
775 node.node_type,
776 node.location_key,
777 location_type,
778 node.backtrace.as_ref(),
779 )?;
780 }
781
782 if config.show_location_groups {
784 let mut nodes_by_location = SecondaryMap::<LocationKey, Vec<VizNodeKey>>::new();
785 for (node_id, node) in structure.nodes.iter() {
786 if let Some(location_key) = node.location_key {
787 nodes_by_location
788 .entry(location_key)
789 .expect("location was removed")
790 .or_default()
791 .push(node_id);
792 }
793 }
794
795 for (location_key, node_ids) in nodes_by_location.iter() {
796 if let Some(&location_type) = structure.locations.get(location_key) {
797 graph_write.write_location_start(location_key, location_type)?;
798 for &node_id in node_ids.iter() {
799 graph_write.write_node(node_id)?;
800 }
801 graph_write.write_location_end()?;
802 }
803 }
804 }
805
806 for edge in structure.edges.iter() {
808 graph_write.write_edge(
809 edge.src,
810 edge.dst,
811 &edge.edge_properties,
812 edge.label.as_deref(),
813 )?;
814 }
815
816 graph_write.write_epilogue()?;
817 Ok(())
818}
819
820impl HydroRoot {
821 pub fn build_graph_structure(
823 &self,
824 structure: &mut HydroGraphStructure,
825 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
826 config: HydroWriteConfig<'_>,
827 ) -> VizNodeKey {
828 fn build_sink_node(
830 structure: &mut HydroGraphStructure,
831 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
832 config: HydroWriteConfig<'_>,
833 input: &HydroNode,
834 sink_metadata: Option<&HydroIrMetadata>,
835 label: NodeLabel,
836 ) -> VizNodeKey {
837 let input_id = input.build_graph_structure(structure, seen_tees, config);
838
839 let effective_metadata = if let Some(meta) = sink_metadata {
841 Some(meta)
842 } else {
843 match input {
844 HydroNode::Placeholder => None,
845 _ => Some(input.metadata()),
847 }
848 };
849
850 let location_key = effective_metadata.map(|m| setup_location(structure, m));
851 let sink_id = structure.add_node_with_backtrace(
852 label,
853 HydroNodeType::Sink,
854 location_key,
855 effective_metadata.map(|m| m.op.backtrace.clone()),
856 );
857
858 let input_metadata = input.metadata();
860 add_edge_with_metadata(
861 structure,
862 input_id,
863 sink_id,
864 Some(input_metadata),
865 sink_metadata,
866 None,
867 );
868
869 sink_id
870 }
871
872 match self {
873 HydroRoot::ForEach { f, input, .. } => build_sink_node(
875 structure,
876 seen_tees,
877 config,
878 input,
879 None,
880 NodeLabel::with_exprs("for_each".to_owned(), vec![f.expr.clone()]),
881 ),
882
883 HydroRoot::SendExternal {
884 to_external_key,
885 to_port_id,
886 input,
887 ..
888 } => build_sink_node(
889 structure,
890 seen_tees,
891 config,
892 input,
893 None,
894 NodeLabel::with_exprs(
895 format!("send_external({}:{})", to_external_key, to_port_id),
896 vec![],
897 ),
898 ),
899
900 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
901 structure,
902 seen_tees,
903 config,
904 input,
905 None,
906 NodeLabel::with_exprs("dest_sink".to_owned(), vec![sink.clone()]),
907 ),
908
909 HydroRoot::CycleSink {
910 cycle_id, input, ..
911 } => build_sink_node(
912 structure,
913 seen_tees,
914 config,
915 input,
916 None,
917 NodeLabel::static_label(format!("cycle_sink({})", cycle_id)),
918 ),
919
920 HydroRoot::EmbeddedOutput { ident, input, .. } => build_sink_node(
921 structure,
922 seen_tees,
923 config,
924 input,
925 None,
926 NodeLabel::static_label(format!("embedded_output({})", ident)),
927 ),
928
929 HydroRoot::Null { input, .. } => build_sink_node(
930 structure,
931 seen_tees,
932 config,
933 input,
934 None,
935 NodeLabel::static_label("null".to_owned()),
936 ),
937 }
938 }
939}
940
941impl HydroNode {
942 pub fn build_graph_structure(
944 &self,
945 structure: &mut HydroGraphStructure,
946 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
947 config: HydroWriteConfig<'_>,
948 ) -> VizNodeKey {
949 struct TransformParams<'a> {
953 structure: &'a mut HydroGraphStructure,
954 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
955 config: HydroWriteConfig<'a>,
956 input: &'a HydroNode,
957 metadata: &'a HydroIrMetadata,
958 op_name: String,
959 node_type: HydroNodeType,
960 }
961
962 fn build_simple_transform(params: TransformParams) -> VizNodeKey {
964 let input_id = params.input.build_graph_structure(
965 params.structure,
966 params.seen_tees,
967 params.config,
968 );
969 let node_id = params.structure.add_node_with_metadata(
970 NodeLabel::Static(params.op_name.to_string()),
971 params.node_type,
972 params.metadata,
973 );
974
975 let input_metadata = params.input.metadata();
977 add_edge_with_metadata(
978 params.structure,
979 input_id,
980 node_id,
981 Some(input_metadata),
982 Some(params.metadata),
983 None,
984 );
985
986 node_id
987 }
988
989 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
991 let input_id = params.input.build_graph_structure(
992 params.structure,
993 params.seen_tees,
994 params.config,
995 );
996 let node_id = params.structure.add_node_with_metadata(
997 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
998 params.node_type,
999 params.metadata,
1000 );
1001
1002 let input_metadata = params.input.metadata();
1004 add_edge_with_metadata(
1005 params.structure,
1006 input_id,
1007 node_id,
1008 Some(input_metadata),
1009 Some(params.metadata),
1010 None,
1011 );
1012
1013 node_id
1014 }
1015
1016 fn build_dual_expr_transform(
1018 params: TransformParams,
1019 expr1: &DebugExpr,
1020 expr2: &DebugExpr,
1021 ) -> VizNodeKey {
1022 let input_id = params.input.build_graph_structure(
1023 params.structure,
1024 params.seen_tees,
1025 params.config,
1026 );
1027 let node_id = params.structure.add_node_with_metadata(
1028 NodeLabel::with_exprs(
1029 params.op_name.to_string(),
1030 vec![expr1.clone(), expr2.clone()],
1031 ),
1032 params.node_type,
1033 params.metadata,
1034 );
1035
1036 let input_metadata = params.input.metadata();
1038 add_edge_with_metadata(
1039 params.structure,
1040 input_id,
1041 node_id,
1042 Some(input_metadata),
1043 Some(params.metadata),
1044 None,
1045 );
1046
1047 node_id
1048 }
1049
1050 fn build_source_node(
1052 structure: &mut HydroGraphStructure,
1053 metadata: &HydroIrMetadata,
1054 label: String,
1055 ) -> VizNodeKey {
1056 structure.add_node_with_metadata(
1057 NodeLabel::Static(label),
1058 HydroNodeType::Source,
1059 metadata,
1060 )
1061 }
1062
1063 match self {
1064 HydroNode::Placeholder => structure.add_node(
1065 NodeLabel::Static("PLACEHOLDER".to_owned()),
1066 HydroNodeType::Transform,
1067 None,
1068 ),
1069
1070 HydroNode::Source {
1071 source, metadata, ..
1072 } => {
1073 let label = match source {
1074 HydroSource::Stream(expr) => format!("source_stream({})", expr),
1075 HydroSource::ExternalNetwork() => "external_network()".to_owned(),
1076 HydroSource::Iter(expr) => format!("source_iter({})", expr),
1077 HydroSource::Spin() => "spin()".to_owned(),
1078 HydroSource::ClusterMembers(location_id, _) => {
1079 format!(
1080 "source_stream(cluster_membership_stream({:?}))",
1081 location_id
1082 )
1083 }
1084 HydroSource::Embedded(ident) => {
1085 format!("embedded_input({})", ident)
1086 }
1087 HydroSource::EmbeddedSingleton(ident) => {
1088 format!("embedded_singleton_input({})", ident)
1089 }
1090 };
1091 build_source_node(structure, metadata, label)
1092 }
1093
1094 HydroNode::SingletonSource {
1095 value,
1096 first_tick_only,
1097 metadata,
1098 } => {
1099 let label = if *first_tick_only {
1100 format!("singleton_first_tick({})", value)
1101 } else {
1102 format!("singleton({})", value)
1103 };
1104 build_source_node(structure, metadata, label)
1105 }
1106
1107 HydroNode::ExternalInput {
1108 from_external_key,
1109 from_port_id,
1110 metadata,
1111 ..
1112 } => build_source_node(
1113 structure,
1114 metadata,
1115 format!("external_input({}:{})", from_external_key, from_port_id),
1116 ),
1117
1118 HydroNode::CycleSource {
1119 cycle_id, metadata, ..
1120 } => build_source_node(structure, metadata, format!("cycle_source({})", cycle_id)),
1121
1122 HydroNode::Tee { inner, metadata }
1123 | HydroNode::Reference {
1124 inner, metadata, ..
1125 } => {
1126 let ptr = inner.as_ptr();
1127 if let Some(&existing_id) = seen_tees.get(&ptr) {
1128 return existing_id;
1129 }
1130
1131 let input_id = inner
1132 .0
1133 .borrow()
1134 .build_graph_structure(structure, seen_tees, config);
1135 let node_type = if matches!(self, HydroNode::Reference { .. }) {
1136 HydroNodeType::Aggregation
1137 } else {
1138 HydroNodeType::Tee
1139 };
1140 let tee_id = structure.add_node_with_metadata(
1141 NodeLabel::Static(extract_op_name(self.print_root())),
1142 node_type,
1143 metadata,
1144 );
1145
1146 seen_tees.insert(ptr, tee_id);
1147
1148 let inner_borrow = inner.0.borrow();
1150 let input_metadata = inner_borrow.metadata();
1151 add_edge_with_metadata(
1152 structure,
1153 input_id,
1154 tee_id,
1155 Some(input_metadata),
1156 Some(metadata),
1157 None,
1158 );
1159 drop(inner_borrow);
1160
1161 tee_id
1162 }
1163
1164 HydroNode::Partition {
1165 inner, metadata, ..
1166 } => {
1167 let ptr = inner.as_ptr();
1168 if let Some(&existing_id) = seen_tees.get(&ptr) {
1169 return existing_id;
1170 }
1171
1172 let input_id = inner
1173 .0
1174 .borrow()
1175 .build_graph_structure(structure, seen_tees, config);
1176 let partition_id = structure.add_node_with_metadata(
1177 NodeLabel::Static(extract_op_name(self.print_root())),
1178 HydroNodeType::Tee,
1179 metadata,
1180 );
1181
1182 seen_tees.insert(ptr, partition_id);
1183
1184 let inner_borrow = inner.0.borrow();
1186 let input_metadata = inner_borrow.metadata();
1187 add_edge_with_metadata(
1188 structure,
1189 input_id,
1190 partition_id,
1191 Some(input_metadata),
1192 Some(metadata),
1193 None,
1194 );
1195 drop(inner_borrow);
1196
1197 partition_id
1198 }
1199
1200 HydroNode::ObserveNonDet {
1202 inner, metadata, ..
1203 } => build_simple_transform(TransformParams {
1204 structure,
1205 seen_tees,
1206 config,
1207 input: inner,
1208 metadata,
1209 op_name: extract_op_name(self.print_root()),
1210 node_type: HydroNodeType::NonDeterministic,
1211 }),
1212
1213 HydroNode::Cast { inner, metadata }
1215 | HydroNode::AssertIsConsistent {
1216 inner, metadata, ..
1217 }
1218 | HydroNode::DeferTick {
1219 input: inner,
1220 metadata,
1221 }
1222 | HydroNode::Enumerate {
1223 input: inner,
1224 metadata,
1225 ..
1226 }
1227 | HydroNode::Unique {
1228 input: inner,
1229 metadata,
1230 }
1231 | HydroNode::ResolveFutures {
1232 input: inner,
1233 metadata,
1234 }
1235 | HydroNode::ResolveFuturesBlocking {
1236 input: inner,
1237 metadata,
1238 }
1239 | HydroNode::ResolveFuturesOrdered {
1240 input: inner,
1241 metadata,
1242 } => build_simple_transform(TransformParams {
1243 structure,
1244 seen_tees,
1245 config,
1246 input: inner,
1247 metadata,
1248 op_name: extract_op_name(self.print_root()),
1249 node_type: HydroNodeType::Transform,
1250 }),
1251
1252 HydroNode::Sort {
1254 input: inner,
1255 metadata,
1256 } => build_simple_transform(TransformParams {
1257 structure,
1258 seen_tees,
1259 config,
1260 input: inner,
1261 metadata,
1262 op_name: extract_op_name(self.print_root()),
1263 node_type: HydroNodeType::Aggregation,
1264 }),
1265
1266 HydroNode::Map {
1268 f, input, metadata, ..
1269 }
1270 | HydroNode::Filter { f, input, metadata }
1271 | HydroNode::FlatMap { f, input, metadata }
1272 | HydroNode::FlatMapStreamBlocking { f, input, metadata }
1273 | HydroNode::FilterMap { f, input, metadata }
1274 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1275 TransformParams {
1276 structure,
1277 seen_tees,
1278 config,
1279 input,
1280 metadata,
1281 op_name: extract_op_name(self.print_root()),
1282 node_type: HydroNodeType::Transform,
1283 },
1284 &f.expr,
1285 ),
1286
1287 HydroNode::Reduce { f, input, metadata }
1289 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1290 TransformParams {
1291 structure,
1292 seen_tees,
1293 config,
1294 input,
1295 metadata,
1296 op_name: extract_op_name(self.print_root()),
1297 node_type: HydroNodeType::Aggregation,
1298 },
1299 &f.expr,
1300 ),
1301
1302 HydroNode::Join {
1304 left,
1305 right,
1306 metadata,
1307 }
1308 | HydroNode::JoinHalf {
1309 left,
1310 right,
1311 metadata,
1312 }
1313 | HydroNode::CrossProduct {
1314 left,
1315 right,
1316 metadata,
1317 }
1318 | HydroNode::CrossSingleton {
1319 left,
1320 right,
1321 metadata,
1322 } => {
1323 let left_id = left.build_graph_structure(structure, seen_tees, config);
1324 let right_id = right.build_graph_structure(structure, seen_tees, config);
1325 let node_id = structure.add_node_with_metadata(
1326 NodeLabel::Static(extract_op_name(self.print_root())),
1327 HydroNodeType::Join,
1328 metadata,
1329 );
1330
1331 let left_metadata = left.metadata();
1333 add_edge_with_metadata(
1334 structure,
1335 left_id,
1336 node_id,
1337 Some(left_metadata),
1338 Some(metadata),
1339 Some("left".to_owned()),
1340 );
1341
1342 let right_metadata = right.metadata();
1344 add_edge_with_metadata(
1345 structure,
1346 right_id,
1347 node_id,
1348 Some(right_metadata),
1349 Some(metadata),
1350 Some("right".to_owned()),
1351 );
1352
1353 node_id
1354 }
1355
1356 HydroNode::Difference {
1358 pos: left,
1359 neg: right,
1360 metadata,
1361 }
1362 | HydroNode::AntiJoin {
1363 pos: left,
1364 neg: right,
1365 metadata,
1366 } => {
1367 let left_id = left.build_graph_structure(structure, seen_tees, config);
1368 let right_id = right.build_graph_structure(structure, seen_tees, config);
1369 let node_id = structure.add_node_with_metadata(
1370 NodeLabel::Static(extract_op_name(self.print_root())),
1371 HydroNodeType::Join,
1372 metadata,
1373 );
1374
1375 let left_metadata = left.metadata();
1377 add_edge_with_metadata(
1378 structure,
1379 left_id,
1380 node_id,
1381 Some(left_metadata),
1382 Some(metadata),
1383 Some("pos".to_owned()),
1384 );
1385
1386 let right_metadata = right.metadata();
1388 add_edge_with_metadata(
1389 structure,
1390 right_id,
1391 node_id,
1392 Some(right_metadata),
1393 Some(metadata),
1394 Some("neg".to_owned()),
1395 );
1396
1397 node_id
1398 }
1399
1400 HydroNode::Fold {
1402 init,
1403 acc,
1404 input,
1405 metadata,
1406 ..
1407 }
1408 | HydroNode::FoldKeyed {
1409 init,
1410 acc,
1411 input,
1412 metadata,
1413 ..
1414 }
1415 | HydroNode::Scan {
1416 init,
1417 acc,
1418 input,
1419 metadata,
1420 }
1421 | HydroNode::ScanAsyncBlocking {
1422 init,
1423 acc,
1424 input,
1425 metadata,
1426 } => {
1427 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
1430 TransformParams {
1431 structure,
1432 seen_tees,
1433 config,
1434 input,
1435 metadata,
1436 op_name: extract_op_name(self.print_root()),
1437 node_type,
1438 },
1439 &init.expr,
1440 &acc.expr,
1441 )
1442 }
1443
1444 HydroNode::ReduceKeyedWatermark {
1446 f,
1447 input,
1448 watermark,
1449 metadata,
1450 } => {
1451 let input_id = input.build_graph_structure(structure, seen_tees, config);
1452 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1453 let location_key = Some(setup_location(structure, metadata));
1454 let join_node_id = structure.add_node_with_backtrace(
1455 NodeLabel::Static(extract_op_name(self.print_root())),
1456 HydroNodeType::Join,
1457 location_key,
1458 Some(metadata.op.backtrace.clone()),
1459 );
1460
1461 let input_metadata = input.metadata();
1463 add_edge_with_metadata(
1464 structure,
1465 input_id,
1466 join_node_id,
1467 Some(input_metadata),
1468 Some(metadata),
1469 Some("input".to_owned()),
1470 );
1471
1472 let watermark_metadata = watermark.metadata();
1474 add_edge_with_metadata(
1475 structure,
1476 watermark_id,
1477 join_node_id,
1478 Some(watermark_metadata),
1479 Some(metadata),
1480 Some("watermark".to_owned()),
1481 );
1482
1483 let node_id = structure.add_node_with_backtrace(
1484 NodeLabel::with_exprs(extract_op_name(self.print_root()), vec![f.expr.clone()]),
1485 HydroNodeType::Aggregation,
1486 location_key,
1487 Some(metadata.op.backtrace.clone()),
1488 );
1489
1490 let join_metadata = metadata; add_edge_with_metadata(
1493 structure,
1494 join_node_id,
1495 node_id,
1496 Some(join_metadata),
1497 Some(metadata),
1498 None,
1499 );
1500
1501 node_id
1502 }
1503
1504 HydroNode::Network {
1505 serialize_fn,
1506 deserialize_fn,
1507 input,
1508 metadata,
1509 ..
1510 } => {
1511 let input_id = input.build_graph_structure(structure, seen_tees, config);
1512 let _from_location_key = setup_location(structure, metadata);
1513
1514 let root = metadata.location_id.root();
1515 let to_location_key = root.key();
1516 let to_location_type = root.location_type().unwrap();
1517 structure.add_location(to_location_key, to_location_type);
1518
1519 let mut label = "network(".to_owned();
1520 if serialize_fn.is_some() {
1521 label.push_str("send");
1522 }
1523 if deserialize_fn.is_some() {
1524 if serialize_fn.is_some() {
1525 label.push_str(" + ");
1526 }
1527 label.push_str("recv");
1528 }
1529 label.push(')');
1530
1531 let network_id = structure.add_node_with_backtrace(
1532 NodeLabel::Static(label),
1533 HydroNodeType::Network,
1534 Some(to_location_key),
1535 Some(metadata.op.backtrace.clone()),
1536 );
1537
1538 let input_metadata = input.metadata();
1540 add_edge_with_metadata(
1541 structure,
1542 input_id,
1543 network_id,
1544 Some(input_metadata),
1545 Some(metadata),
1546 Some(format!("to {:?}({})", to_location_type, to_location_key)),
1547 );
1548
1549 network_id
1550 }
1551
1552 HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1554 structure,
1555 seen_tees,
1556 config,
1557 input: inner,
1558 metadata,
1559 op_name: extract_op_name(self.print_root()),
1560 node_type: HydroNodeType::NonDeterministic,
1561 }),
1562
1563 HydroNode::YieldConcat { inner, .. } => {
1564 inner.build_graph_structure(structure, seen_tees, config)
1566 }
1567
1568 HydroNode::UnboundSingleton { inner, .. } => {
1569 inner.build_graph_structure(structure, seen_tees, config)
1570 }
1571
1572 HydroNode::BeginAtomic { inner, .. } => {
1573 inner.build_graph_structure(structure, seen_tees, config)
1574 }
1575
1576 HydroNode::EndAtomic { inner, .. } => {
1577 inner.build_graph_structure(structure, seen_tees, config)
1578 }
1579
1580 HydroNode::Chain {
1581 first,
1582 second,
1583 metadata,
1584 }
1585 | HydroNode::MergeOrdered {
1586 first,
1587 second,
1588 metadata,
1589 } => {
1590 let first_id = first.build_graph_structure(structure, seen_tees, config);
1591 let second_id = second.build_graph_structure(structure, seen_tees, config);
1592 let location_key = Some(setup_location(structure, metadata));
1593 let chain_id = structure.add_node_with_backtrace(
1594 NodeLabel::Static(extract_op_name(self.print_root())),
1595 HydroNodeType::Transform,
1596 location_key,
1597 Some(metadata.op.backtrace.clone()),
1598 );
1599
1600 let first_metadata = first.metadata();
1602 add_edge_with_metadata(
1603 structure,
1604 first_id,
1605 chain_id,
1606 Some(first_metadata),
1607 Some(metadata),
1608 Some("first".to_owned()),
1609 );
1610
1611 let second_metadata = second.metadata();
1613 add_edge_with_metadata(
1614 structure,
1615 second_id,
1616 chain_id,
1617 Some(second_metadata),
1618 Some(metadata),
1619 Some("second".to_owned()),
1620 );
1621
1622 chain_id
1623 }
1624
1625 HydroNode::ChainFirst {
1626 first,
1627 second,
1628 metadata,
1629 } => {
1630 let first_id = first.build_graph_structure(structure, seen_tees, config);
1631 let second_id = second.build_graph_structure(structure, seen_tees, config);
1632 let location_key = Some(setup_location(structure, metadata));
1633 let chain_id = structure.add_node_with_backtrace(
1634 NodeLabel::Static(extract_op_name(self.print_root())),
1635 HydroNodeType::Transform,
1636 location_key,
1637 Some(metadata.op.backtrace.clone()),
1638 );
1639
1640 let first_metadata = first.metadata();
1642 add_edge_with_metadata(
1643 structure,
1644 first_id,
1645 chain_id,
1646 Some(first_metadata),
1647 Some(metadata),
1648 Some("first".to_owned()),
1649 );
1650
1651 let second_metadata = second.metadata();
1653 add_edge_with_metadata(
1654 structure,
1655 second_id,
1656 chain_id,
1657 Some(second_metadata),
1658 Some(metadata),
1659 Some("second".to_owned()),
1660 );
1661
1662 chain_id
1663 }
1664
1665 HydroNode::Counter {
1666 tag: _,
1667 prefix: _,
1668 duration,
1669 input,
1670 metadata,
1671 } => build_single_expr_transform(
1672 TransformParams {
1673 structure,
1674 seen_tees,
1675 config,
1676 input,
1677 metadata,
1678 op_name: extract_op_name(self.print_root()),
1679 node_type: HydroNodeType::Transform,
1680 },
1681 duration,
1682 ),
1683 }
1684 }
1685}
1686
1687macro_rules! render_hydro_ir {
1690 ($name:ident, $write_fn:ident) => {
1691 pub fn $name(roots: &[HydroRoot], config: HydroWriteConfig<'_>) -> String {
1692 let mut output = String::new();
1693 $write_fn(&mut output, roots, config).unwrap();
1694 output
1695 }
1696 };
1697}
1698
1699macro_rules! write_hydro_ir {
1701 ($name:ident, $writer_type:ty, $constructor:expr) => {
1702 pub fn $name(
1703 output: impl std::fmt::Write,
1704 roots: &[HydroRoot],
1705 config: HydroWriteConfig<'_>,
1706 ) -> std::fmt::Result {
1707 let mut graph_write: $writer_type = $constructor(output, config);
1708 write_hydro_ir_graph(&mut graph_write, roots, config)
1709 }
1710 };
1711}
1712
1713render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1714write_hydro_ir!(
1715 write_hydro_ir_mermaid,
1716 HydroMermaid<_>,
1717 HydroMermaid::new_with_config
1718);
1719
1720render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1721write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1722
1723render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1725
1726render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1728write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1729
1730fn write_hydro_ir_graph<W>(
1731 graph_write: W,
1732 roots: &[HydroRoot],
1733 config: HydroWriteConfig<'_>,
1734) -> Result<(), W::Err>
1735where
1736 W: HydroGraphWrite,
1737{
1738 let mut structure = HydroGraphStructure::new();
1739 let mut seen_tees = HashMap::new();
1740
1741 for leaf in roots {
1743 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1744 }
1745
1746 write_graph_structure(&structure, graph_write, config)
1747}