Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use sealed::sealed;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_stream::KeyedStream;
15use super::optional::Optional;
16use super::singleton::Singleton;
17use super::sliced::sliced;
18use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::stream::{Ordering, Retries};
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::DeferTick;
30use crate::location::{Atomic, Location, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::properties::manual_proof;
34
35/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
36///
37/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
38/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
39/// indicates that entries may be added over time, but once an entry is added it will never be
40/// removed and its value will never change.
41pub trait KeyedSingletonBound {
42    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
43    type UnderlyingBound: Boundedness;
44    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
45    type ValueBound: Boundedness;
46
47    /// The type of the keyed singleton if the value for each key is immutable.
48    type WithBoundedValue: KeyedSingletonBound<
49            UnderlyingBound = Self::UnderlyingBound,
50            ValueBound = Bounded,
51            EraseMonotonic = Self::WithBoundedValue,
52        >;
53
54    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
55    type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
56
57    /// The [`Boundedness`] of the keyed singleton produced by folding a [`KeyedStream`] with
58    /// [`Self`] boundedness when the aggregation does *not* have a monotonicity proof.
59    ///
60    /// Without a monotonicity proof, the per-key values may change arbitrarily, so an unbounded
61    /// input collapses to [`MonotonicKeys`] (keys are still only added, never removed).
62    type KeyedStreamToNonMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
63
64    /// The type of the keyed singleton if the value for each key is no longer monotonic.
65    type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
66
67    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
68    fn bound_kind() -> KeyedSingletonBoundKind;
69}
70
71impl KeyedSingletonBound for Unbounded {
72    type UnderlyingBound = Unbounded;
73    type ValueBound = Unbounded;
74    type WithBoundedValue = BoundedValue;
75    type KeyedStreamToMonotone = MonotonicValue;
76    type KeyedStreamToNonMonotone = MonotonicKeys;
77    type EraseMonotonic = Unbounded;
78
79    fn bound_kind() -> KeyedSingletonBoundKind {
80        KeyedSingletonBoundKind::Unbounded
81    }
82}
83
84impl KeyedSingletonBound for Bounded {
85    type UnderlyingBound = Bounded;
86    type ValueBound = Bounded;
87    type WithBoundedValue = Bounded;
88    type KeyedStreamToMonotone = Bounded;
89    type KeyedStreamToNonMonotone = Bounded;
90    type EraseMonotonic = Bounded;
91
92    fn bound_kind() -> KeyedSingletonBoundKind {
93        KeyedSingletonBoundKind::Bounded
94    }
95}
96
97/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
98/// its value is bounded and will never change, but new entries may appear asynchronously
99pub struct BoundedValue;
100
101impl KeyedSingletonBound for BoundedValue {
102    type UnderlyingBound = Unbounded;
103    type ValueBound = Bounded;
104    type WithBoundedValue = BoundedValue;
105    type KeyedStreamToMonotone = BoundedValue;
106    type KeyedStreamToNonMonotone = BoundedValue;
107    type EraseMonotonic = BoundedValue;
108
109    fn bound_kind() -> KeyedSingletonBoundKind {
110        KeyedSingletonBoundKind::BoundedValue
111    }
112}
113
114/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
115/// it will never be removed, and the corresponding value will only increase monotonically.
116pub struct MonotonicValue;
117
118impl KeyedSingletonBound for MonotonicValue {
119    type UnderlyingBound = Unbounded;
120    type ValueBound = Unbounded;
121    type WithBoundedValue = BoundedValue;
122    type KeyedStreamToMonotone = MonotonicValue;
123    type KeyedStreamToNonMonotone = MonotonicKeys;
124    type EraseMonotonic = MonotonicKeys;
125
126    fn bound_kind() -> KeyedSingletonBoundKind {
127        KeyedSingletonBoundKind::MonotonicValue
128    }
129}
130
131/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key
132/// appears, it will never be removed, but the corresponding value may change arbitrarily.
133pub struct MonotonicKeys;
134
135impl KeyedSingletonBound for MonotonicKeys {
136    type UnderlyingBound = Unbounded;
137    type ValueBound = Unbounded;
138    type WithBoundedValue = BoundedValue;
139    type KeyedStreamToMonotone = MonotonicKeys;
140    type KeyedStreamToNonMonotone = MonotonicKeys;
141    type EraseMonotonic = MonotonicKeys;
142
143    fn bound_kind() -> KeyedSingletonBoundKind {
144        KeyedSingletonBoundKind::MonotonicKeys
145    }
146}
147
148#[sealed]
149#[diagnostic::on_unimplemented(
150    message = "The keyed singleton must have monotonic values (`MonotonicValue`) or be bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
151    label = "required here",
152    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
153)]
154/// Marker trait that is implemented for [`KeyedSingletonBound`] types whose per-key values
155/// are monotonically non-decreasing (or bounded).
156pub trait IsKeyedMonotonic: KeyedSingletonBound {}
157
158#[sealed]
159#[diagnostic::do_not_recommend]
160impl IsKeyedMonotonic for MonotonicValue {}
161
162#[sealed]
163#[diagnostic::do_not_recommend]
164impl IsKeyedMonotonic for BoundedValue {}
165
166#[sealed]
167#[diagnostic::do_not_recommend]
168impl<B: IsBounded + KeyedSingletonBound> IsKeyedMonotonic for B {}
169
170/// Mapping from keys of type `K` to values of type `V`.
171///
172/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
173/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
174/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
175/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
176/// keys cannot be removed and the value for each key is immutable.
177///
178/// Type Parameters:
179/// - `K`: the type of the key for each entry
180/// - `V`: the type of the value for each entry
181/// - `Loc`: the [`Location`] where the keyed singleton is materialized
182/// - `Bound`: tracks whether the entries are:
183///     - [`Bounded`] (local and finite)
184///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
185///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
186pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
187    pub(crate) location: Loc,
188    pub(crate) ir_node: RefCell<HydroNode>,
189    pub(crate) flow_state: FlowState,
190
191    _phantom: PhantomData<(K, V, Loc, Bound)>,
192}
193
194impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
195    fn drop(&mut self) {
196        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
197        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
198            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
199                input: Box::new(ir_node),
200                op_metadata: HydroIrOpMetadata::new(),
201            });
202        }
203    }
204}
205
206impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
207    for KeyedSingleton<K, V, Loc, Bound>
208{
209    fn clone(&self) -> Self {
210        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
211            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
212            *self.ir_node.borrow_mut() = HydroNode::Tee {
213                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
214                metadata: self.location.new_node_metadata(Self::collection_kind()),
215            };
216        }
217
218        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
219            KeyedSingleton {
220                location: self.location.clone(),
221                flow_state: self.flow_state.clone(),
222                ir_node: HydroNode::Tee {
223                    inner: SharedNode(inner.0.clone()),
224                    metadata: metadata.clone(),
225                }
226                .into(),
227                _phantom: PhantomData,
228            }
229        } else {
230            unreachable!()
231        }
232    }
233}
234
235impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
236    for KeyedSingleton<K, V, L, B>
237where
238    L: Location<'a>,
239{
240    type Location = L;
241
242    fn create_source(cycle_id: CycleId, location: L) -> Self {
243        KeyedSingleton {
244            flow_state: location.flow_state().clone(),
245            location: location.clone(),
246            ir_node: RefCell::new(HydroNode::CycleSource {
247                cycle_id,
248                metadata: location.new_node_metadata(Self::collection_kind()),
249            }),
250            _phantom: PhantomData,
251        }
252    }
253}
254
255impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
256where
257    L: Location<'a>,
258{
259    type Location = Tick<L>;
260
261    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262        KeyedSingleton::new(
263            location.clone(),
264            HydroNode::CycleSource {
265                cycle_id,
266                metadata: location.new_node_metadata(Self::collection_kind()),
267            },
268        )
269    }
270}
271
272impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
273where
274    L: Location<'a>,
275{
276    fn defer_tick(self) -> Self {
277        KeyedSingleton::defer_tick(self)
278    }
279}
280
281impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
282    for KeyedSingleton<K, V, L, B>
283where
284    L: Location<'a>,
285{
286    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
287        assert_eq!(
288            Location::id(&self.location),
289            expected_location,
290            "locations do not match"
291        );
292        self.location
293            .flow_state()
294            .borrow_mut()
295            .push_root(HydroRoot::CycleSink {
296                cycle_id,
297                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
298                op_metadata: HydroIrOpMetadata::new(),
299            });
300    }
301}
302
303impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
304where
305    L: Location<'a>,
306{
307    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
308        assert_eq!(
309            Location::id(&self.location),
310            expected_location,
311            "locations do not match"
312        );
313        self.location
314            .flow_state()
315            .borrow_mut()
316            .push_root(HydroRoot::CycleSink {
317                cycle_id,
318                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
319                op_metadata: HydroIrOpMetadata::new(),
320            });
321    }
322}
323
324impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
325    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
326        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
327        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
328
329        let flow_state = location.flow_state().clone();
330        KeyedSingleton {
331            location,
332            flow_state,
333            ir_node: RefCell::new(ir_node),
334            _phantom: PhantomData,
335        }
336    }
337
338    /// Returns the [`Location`] where this keyed singleton is being materialized.
339    pub fn location(&self) -> &L {
340        &self.location
341    }
342
343    /// Weakens the consistency of this live collection to not guarantee any consistency across
344    /// cluster members (if this collection is on a cluster).
345    pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::DropConsistency, B>
346    where
347        L: Location<'a>,
348    {
349        if L::consistency()
350            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
351        {
352            // already no consistency
353            KeyedSingleton::new(
354                self.location.drop_consistency(),
355                self.ir_node.replace(HydroNode::Placeholder),
356            )
357        } else {
358            KeyedSingleton::new(
359                self.location.drop_consistency(),
360                HydroNode::Cast {
361                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
362                    metadata: self
363                        .location
364                        .drop_consistency()
365                        .new_node_metadata(
366                            KeyedSingleton::<K, V, L::DropConsistency, B>::collection_kind(),
367                        ),
368                },
369            )
370        }
371    }
372
373    /// Casts this live collection to have the consistency guarantees specified in the given
374    /// location type parameter. The developer must ensure that the strengthened consistency
375    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
376    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
377        self,
378        _proof: impl crate::properties::ConsistencyProof,
379    ) -> KeyedSingleton<K, V, L2, B>
380    where
381        L: Location<'a>,
382    {
383        if L::consistency() == L2::consistency() {
384            // already consistent
385            KeyedSingleton::new(
386                self.location.with_consistency_of(),
387                self.ir_node.replace(HydroNode::Placeholder),
388            )
389        } else {
390            KeyedSingleton::new(
391                self.location.with_consistency_of(),
392                HydroNode::AssertIsConsistent {
393                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
394                    trusted: false,
395                    metadata: self
396                        .location
397                        .clone()
398                        .with_consistency_of::<L2>()
399                        .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
400                },
401            )
402        }
403    }
404}
405
406#[cfg(stageleft_runtime)]
407fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
408    me: KeyedSingleton<K, V, L, Bounded>,
409) -> Singleton<usize, L, Bounded> {
410    me.entries().count()
411}
412
413#[cfg(stageleft_runtime)]
414fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
415    me: KeyedSingleton<K, V, L, Bounded>,
416) -> Singleton<HashMap<K, V>, L, Bounded>
417where
418    K: Eq + Hash,
419{
420    me.entries()
421        .assume_ordering_trusted(nondet!(
422            /// There is only one element associated with each key. The closure technically
423            /// isn't commutative in the case where both passed entries have the same key
424            /// but different values.
425            ///
426            /// In the future, we may want to have an `assume!(...)` statement in the UDF that
427            /// the key is never already present in the map.
428        ))
429        .fold(
430            q!(|| HashMap::new()),
431            q!(|map, (k, v)| {
432                map.insert(k, v);
433            }),
434        )
435}
436
437impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
438    pub(crate) fn collection_kind() -> CollectionKind {
439        CollectionKind::KeyedSingleton {
440            bound: B::bound_kind(),
441            key_type: stageleft::quote_type::<K>().into(),
442            value_type: stageleft::quote_type::<V>().into(),
443        }
444    }
445
446    /// Transforms each value by invoking `f` on each element, with keys staying the same
447    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
448    ///
449    /// If you do not want to modify the stream and instead only want to view
450    /// each item use [`KeyedSingleton::inspect`] instead.
451    ///
452    /// # Example
453    /// ```rust
454    /// # #[cfg(feature = "deploy")] {
455    /// # use hydro_lang::prelude::*;
456    /// # use futures::StreamExt;
457    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
458    /// let keyed_singleton = // { 1: 2, 2: 4 }
459    /// # process
460    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
461    /// #     .into_keyed()
462    /// #     .first();
463    /// keyed_singleton.map(q!(|v| v + 1))
464    /// #   .entries()
465    /// # }, |mut stream| async move {
466    /// // { 1: 3, 2: 5 }
467    /// # let mut results = Vec::new();
468    /// # for _ in 0..2 {
469    /// #     results.push(stream.next().await.unwrap());
470    /// # }
471    /// # results.sort();
472    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
473    /// # }));
474    /// # }
475    /// ```
476    pub fn map<U, F>(
477        self,
478        f: impl IntoQuotedMut<'a, F, L> + Copy,
479    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
480    where
481        F: Fn(V) -> U + 'a,
482    {
483        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
484        let map_f = q!({
485            let orig = f;
486            move |(k, v)| (k, orig(v))
487        })
488        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
489        .into();
490
491        KeyedSingleton::new(
492            self.location.clone(),
493            HydroNode::Map {
494                f: map_f,
495                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
496                metadata: self.location.new_node_metadata(KeyedSingleton::<
497                    K,
498                    U,
499                    L,
500                    B::EraseMonotonic,
501                >::collection_kind()),
502            },
503        )
504    }
505
506    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
507    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
508    ///
509    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
510    /// the new value `U`. The key remains unchanged in the output.
511    ///
512    /// # Example
513    /// ```rust
514    /// # #[cfg(feature = "deploy")] {
515    /// # use hydro_lang::prelude::*;
516    /// # use futures::StreamExt;
517    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
518    /// let keyed_singleton = // { 1: 2, 2: 4 }
519    /// # process
520    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
521    /// #     .into_keyed()
522    /// #     .first();
523    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
524    /// #   .entries()
525    /// # }, |mut stream| async move {
526    /// // { 1: 3, 2: 6 }
527    /// # let mut results = Vec::new();
528    /// # for _ in 0..2 {
529    /// #     results.push(stream.next().await.unwrap());
530    /// # }
531    /// # results.sort();
532    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
533    /// # }));
534    /// # }
535    /// ```
536    pub fn map_with_key<U, F>(
537        self,
538        f: impl IntoQuotedMut<'a, F, L> + Copy,
539    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
540    where
541        F: Fn((K, V)) -> U + 'a,
542        K: Clone,
543    {
544        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
545        let map_f = q!({
546            let orig = f;
547            move |(k, v)| {
548                let out = orig((Clone::clone(&k), v));
549                (k, out)
550            }
551        })
552        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
553        .into();
554
555        KeyedSingleton::new(
556            self.location.clone(),
557            HydroNode::Map {
558                f: map_f,
559                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
560                metadata: self.location.new_node_metadata(KeyedSingleton::<
561                    K,
562                    U,
563                    L,
564                    B::EraseMonotonic,
565                >::collection_kind()),
566            },
567        )
568    }
569
570    /// Gets the number of keys in the keyed singleton.
571    ///
572    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
573    /// since keys may be added / removed over time. When the set of keys changes, the count will
574    /// be asynchronously updated.
575    ///
576    /// # Example
577    /// ```rust
578    /// # #[cfg(feature = "deploy")] {
579    /// # use hydro_lang::prelude::*;
580    /// # use futures::StreamExt;
581    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
582    /// # let tick = process.tick();
583    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
584    /// # process
585    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
586    /// #     .into_keyed()
587    /// #     .batch(&tick, nondet!(/** test */))
588    /// #     .first();
589    /// keyed_singleton.key_count()
590    /// # .all_ticks()
591    /// # }, |mut stream| async move {
592    /// // 3
593    /// # assert_eq!(stream.next().await.unwrap(), 3);
594    /// # }));
595    /// # }
596    /// ```
597    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
598        if B::ValueBound::BOUNDED {
599            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
600                location: self.location.clone(),
601                flow_state: self.flow_state.clone(),
602                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
603                _phantom: PhantomData,
604            };
605
606            me.entries().count().ignore_monotonic()
607        } else if L::is_top_level()
608            && let Some(tick) = self.location.try_tick()
609            && (B::bound_kind() == KeyedSingletonBoundKind::Unbounded
610                || B::bound_kind() == KeyedSingletonBoundKind::MonotonicKeys
611                || B::bound_kind() == KeyedSingletonBoundKind::MonotonicValue)
612        {
613            let location = self.location.clone();
614            let ir_node = self.ir_node.replace(HydroNode::Placeholder);
615            let me: KeyedSingleton<K, V, L, MonotonicKeys> =
616                KeyedSingleton::new(location.clone(), ir_node);
617
618            let out =
619                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
620                    .latest();
621            Singleton::new(location, out.ir_node.replace(HydroNode::Placeholder))
622        } else {
623            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
624        }
625    }
626
627    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
628    ///
629    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
630    /// asynchronously as well.
631    ///
632    /// # Example
633    /// ```rust
634    /// # #[cfg(feature = "deploy")] {
635    /// # use hydro_lang::prelude::*;
636    /// # use futures::StreamExt;
637    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
639    /// # process
640    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
641    /// #     .into_keyed()
642    /// #     .batch(&process.tick(), nondet!(/** test */))
643    /// #     .first();
644    /// keyed_singleton.into_singleton()
645    /// # .all_ticks()
646    /// # }, |mut stream| async move {
647    /// // { 1: "a", 2: "b", 3: "c" }
648    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
649    /// # }));
650    /// # }
651    /// ```
652    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
653    where
654        K: Eq + Hash,
655    {
656        if B::ValueBound::BOUNDED {
657            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
658                location: self.location.clone(),
659                flow_state: self.flow_state.clone(),
660                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
661                _phantom: PhantomData,
662            };
663
664            me.entries()
665                .assume_ordering_trusted(nondet!(
666                    /// There is only one element associated with each key. The closure technically
667                    /// isn't commutative in the case where both passed entries have the same key
668                    /// but different values.
669                    ///
670                    /// In the future, we may want to have an `assume!(...)` statement in the UDF that
671                    /// the key is never already present in the map.
672                ))
673                .fold(
674                    q!(|| HashMap::new()),
675                    q!(|map, (k, v)| {
676                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
677                        map.insert(k, v);
678                    }),
679                )
680        } else if L::is_top_level()
681            && let Some(tick) = self.location.try_tick()
682            && (B::bound_kind() == KeyedSingletonBoundKind::Unbounded
683                || B::bound_kind() == KeyedSingletonBoundKind::MonotonicKeys
684                || B::bound_kind() == KeyedSingletonBoundKind::MonotonicValue)
685        {
686            let location = self.location.clone();
687            let ir_node = self.ir_node.replace(HydroNode::Placeholder);
688            let me: KeyedSingleton<K, V, L, MonotonicKeys> =
689                KeyedSingleton::new(location.clone(), ir_node);
690
691            let out = into_singleton_inside_tick(
692                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693            )
694            .latest();
695            Singleton::new(location, out.ir_node.replace(HydroNode::Placeholder))
696        } else {
697            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
698        }
699    }
700
701    /// An operator which allows you to "name" a `HydroNode`.
702    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
703    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
704        {
705            let mut node = self.ir_node.borrow_mut();
706            let metadata = node.metadata_mut();
707            metadata.tag = Some(name.to_owned());
708        }
709        self
710    }
711
712    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
713    /// implies that `B == Bounded`.
714    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
715    where
716        B: IsBounded,
717    {
718        KeyedSingleton::new(
719            self.location.clone(),
720            self.ir_node.replace(HydroNode::Placeholder),
721        )
722    }
723
724    /// Gets the value associated with a specific key from the keyed singleton.
725    /// Returns `None` if the key is `None` or there is no associated value.
726    ///
727    /// # Example
728    /// ```rust
729    /// # #[cfg(feature = "deploy")] {
730    /// # use hydro_lang::prelude::*;
731    /// # use futures::StreamExt;
732    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
733    /// let tick = process.tick();
734    /// let keyed_data = process
735    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
736    ///     .into_keyed()
737    ///     .batch(&tick, nondet!(/** test */))
738    ///     .first();
739    /// let key = tick.singleton(q!(1));
740    /// keyed_data.get(key).all_ticks()
741    /// # }, |mut stream| async move {
742    /// // 2
743    /// # assert_eq!(stream.next().await.unwrap(), 2);
744    /// # }));
745    /// # }
746    /// ```
747    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
748    where
749        B: IsBounded,
750        K: Hash + Eq + Clone,
751        V: Clone,
752    {
753        self.make_bounded()
754            .into_keyed_stream()
755            .get(key)
756            .cast_at_most_one_element()
757    }
758
759    /// Emit a keyed stream containing keys shared between the keyed singleton and the
760    /// keyed stream, where each value in the output keyed stream is a tuple of
761    /// (the keyed singleton's value, the keyed stream's value).
762    ///
763    /// # Example
764    /// ```rust
765    /// # #[cfg(feature = "deploy")] {
766    /// # use hydro_lang::prelude::*;
767    /// # use futures::StreamExt;
768    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769    /// let tick = process.tick();
770    /// let keyed_data = process
771    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
772    ///     .into_keyed()
773    ///     .batch(&tick, nondet!(/** test */))
774    ///     .first();
775    /// let other_data = process
776    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
777    ///     .into_keyed()
778    ///     .batch(&tick, nondet!(/** test */));
779    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
780    /// # }, |mut stream| async move {
781    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
782    /// # let mut results = vec![];
783    /// # for _ in 0..3 {
784    /// #     results.push(stream.next().await.unwrap());
785    /// # }
786    /// # results.sort();
787    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
788    /// # }));
789    /// # }
790    /// ```
791    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
792        self,
793        other: KeyedStream<K, V2, L, B2, O2, R2>,
794    ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
795    where
796        B: IsBounded,
797        K: Eq + Hash + Clone,
798        V: Clone,
799        V2: Clone,
800    {
801        // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
802        // always produces deterministic order per key (nested loop join), this could just use
803        // `join_keyed_stream` without constructing IRs manually
804        KeyedStream::new(
805            self.location.clone(),
806            HydroNode::Join {
807                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
808                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
809                metadata: self
810                    .location
811                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
812            },
813        )
814    }
815
816    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
817    /// where each value in the output keyed singleton is a tuple of
818    /// (self.value, other.value).
819    ///
820    /// # Example
821    /// ```rust
822    /// # #[cfg(feature = "deploy")] {
823    /// # use hydro_lang::prelude::*;
824    /// # use futures::StreamExt;
825    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
826    /// # let tick = process.tick();
827    /// let requests = // { 1: 10, 2: 20, 3: 30 }
828    /// # process
829    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
830    /// #     .into_keyed()
831    /// #     .batch(&tick, nondet!(/** test */))
832    /// #     .first();
833    /// let other = // { 1: 100, 2: 200, 4: 400 }
834    /// # process
835    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
836    /// #     .into_keyed()
837    /// #     .batch(&tick, nondet!(/** test */))
838    /// #     .first();
839    /// requests.join_keyed_singleton(other)
840    /// # .entries().all_ticks()
841    /// # }, |mut stream| async move {
842    /// // { 1: (10, 100), 2: (20, 200) }
843    /// # let mut results = vec![];
844    /// # for _ in 0..2 {
845    /// #     results.push(stream.next().await.unwrap());
846    /// # }
847    /// # results.sort();
848    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
849    /// # }));
850    /// # }
851    /// ```
852    pub fn join_keyed_singleton<V2: Clone>(
853        self,
854        other: KeyedSingleton<K, V2, L, Bounded>,
855    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
856    where
857        B: IsBounded,
858        K: Eq + Hash + Clone,
859        V: Clone,
860    {
861        let result_stream = self
862            .make_bounded()
863            .entries()
864            .join(other.entries())
865            .into_keyed();
866
867        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
868        result_stream.cast_at_most_one_entry_per_key()
869    }
870
871    /// For each value in `self`, find the matching key in `lookup`.
872    /// The output is a keyed singleton with the key from `self`, and a value
873    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
874    /// If the key is not present in `lookup`, the option will be [`None`].
875    ///
876    /// # Example
877    /// ```rust
878    /// # #[cfg(feature = "deploy")] {
879    /// # use hydro_lang::prelude::*;
880    /// # use futures::StreamExt;
881    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
882    /// # let tick = process.tick();
883    /// let requests = // { 1: 10, 2: 20 }
884    /// # process
885    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
886    /// #     .into_keyed()
887    /// #     .batch(&tick, nondet!(/** test */))
888    /// #     .first();
889    /// let other_data = // { 10: 100, 11: 110 }
890    /// # process
891    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
892    /// #     .into_keyed()
893    /// #     .batch(&tick, nondet!(/** test */))
894    /// #     .first();
895    /// requests.lookup_keyed_singleton(other_data)
896    /// # .entries().all_ticks()
897    /// # }, |mut stream| async move {
898    /// // { 1: (10, Some(100)), 2: (20, None) }
899    /// # let mut results = vec![];
900    /// # for _ in 0..2 {
901    /// #     results.push(stream.next().await.unwrap());
902    /// # }
903    /// # results.sort();
904    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
905    /// # }));
906    /// # }
907    /// ```
908    pub fn lookup_keyed_singleton<V2>(
909        self,
910        lookup: KeyedSingleton<V, V2, L, Bounded>,
911    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
912    where
913        B: IsBounded,
914        K: Eq + Hash + Clone,
915        V: Eq + Hash + Clone,
916        V2: Clone,
917    {
918        let result_stream = self
919            .make_bounded()
920            .into_keyed_stream()
921            .lookup_keyed_stream(lookup.into_keyed_stream());
922
923        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
924        result_stream.cast_at_most_one_entry_per_key()
925    }
926
927    /// For each value in `self`, find the matching key in `lookup`.
928    /// The output is a keyed stream with the key from `self`, and a value
929    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
930    /// If the key is not present in `lookup`, the option will be [`None`].
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use futures::StreamExt;
937    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938    /// # let tick = process.tick();
939    /// let requests = // { 1: 10, 2: 20 }
940    /// # process
941    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
942    /// #     .into_keyed()
943    /// #     .batch(&tick, nondet!(/** test */))
944    /// #     .first();
945    /// let other_data = // { 10: 100, 10: 110 }
946    /// # process
947    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
948    /// #     .into_keyed()
949    /// #     .batch(&tick, nondet!(/** test */));
950    /// requests.lookup_keyed_stream(other_data)
951    /// # .entries().all_ticks()
952    /// # }, |mut stream| async move {
953    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
954    /// # let mut results = vec![];
955    /// # for _ in 0..3 {
956    /// #     results.push(stream.next().await.unwrap());
957    /// # }
958    /// # results.sort();
959    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
960    /// # }));
961    /// # }
962    /// ```
963    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
964        self,
965        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
966    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
967    where
968        B: IsBounded,
969        K: Eq + Hash + Clone,
970        V: Eq + Hash + Clone,
971        V2: Clone,
972    {
973        self.make_bounded()
974            .entries()
975            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
976            .into_keyed()
977            .lookup_keyed_stream(lookup)
978    }
979
980    /// For each key present in both `self` and `thresholds`, emits a [`KeyedStream`] event the first
981    /// time that key's value becomes greater than or equal to the corresponding threshold value.
982    /// The emitted value for each key is the threshold value itself.
983    ///
984    /// This requires the keyed singleton to have monotonic values ([`MonotonicValue`] or [`Bounded`]),
985    /// because otherwise the threshold detection would be non-deterministic.
986    ///
987    /// The `thresholds` parameter is a [`BoundedValue`] keyed singleton mapping each key to its
988    /// threshold. Thresholds may arrive asynchronously (new keys appear over time), but once set
989    /// for a key, the threshold value is fixed. Late-arriving thresholds are checked against the
990    /// current snapshot value immediately.
991    ///
992    /// # Example
993    /// ```rust,ignore
994    /// use hydro_lang::prelude::*;
995    ///
996    /// // Given a monotonically increasing keyed singleton (e.g. from fold with monotone proof)
997    /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
998    ///     .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 is monotone */)));
999    ///
1000    /// // BoundedValue keyed singleton of thresholds (from .first())
1001    /// let thresholds = threshold_source.into_keyed().first();
1002    ///
1003    /// // Emits (key, threshold_value) the first time each key's value >= threshold
1004    /// let crossed = counts.threshold_greater_or_equal(thresholds);
1005    /// ```
1006    pub fn threshold_greater_or_equal(
1007        self,
1008        thresholds: KeyedSingleton<K, V, L, BoundedValue>,
1009    ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1010    where
1011        K: Clone + Eq + Hash,
1012        V: Clone + PartialOrd,
1013        B: IsKeyedMonotonic,
1014    {
1015        let self_location = self.location.clone();
1016        match B::bound_kind() {
1017            KeyedSingletonBoundKind::Bounded => {
1018                // Bounded case: self is already fixed, just join and filter
1019                let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1020                    self.location.clone(),
1021                    self.ir_node.replace(HydroNode::Placeholder),
1022                );
1023                let result = me
1024                    .entries()
1025                    .join(thresholds.entries())
1026                    .filter_map(q!(|(k, (val, thresh))| {
1027                        if val >= thresh {
1028                            Some((k, thresh))
1029                        } else {
1030                            None
1031                        }
1032                    }))
1033                    .into_keyed();
1034                KeyedStream::new(
1035                    result.location.clone(),
1036                    result.ir_node.replace(HydroNode::Placeholder),
1037                )
1038            }
1039            KeyedSingletonBoundKind::MonotonicValue => {
1040                let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1041                    self.location.clone(),
1042                    self.ir_node.replace(HydroNode::Placeholder),
1043                );
1044
1045                let result = sliced! {
1046                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1047                    let thresh_snapshot =
1048                        use(thresholds, nondet!(/** thresholds are deterministic */));
1049                    let mut already_crossed =
1050                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1051
1052                    let joined = thresh_snapshot.entries().join(snapshot.entries());
1053                    let passed = joined
1054                        .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1055                        .map(q!(|(k, (thresh, _))| (k, thresh)));
1056
1057                    let newly_crossed = passed.anti_join(already_crossed.clone());
1058                    already_crossed =
1059                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1060
1061                    newly_crossed.into_keyed()
1062                };
1063
1064                KeyedStream::new(
1065                    self_location,
1066                    result.ir_node.replace(HydroNode::Placeholder),
1067                )
1068            }
1069            KeyedSingletonBoundKind::BoundedValue => {
1070                let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1071                    self.location.clone(),
1072                    self.ir_node.replace(HydroNode::Placeholder),
1073                );
1074
1075                let result = sliced! {
1076                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1077                    let thresh_snapshot =
1078                        use(thresholds, nondet!(/** thresholds are deterministic */));
1079                    let mut already_crossed =
1080                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1081
1082                    let joined = thresh_snapshot.entries().join(snapshot.entries());
1083                    let passed = joined
1084                        .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1085                        .map(q!(|(k, (thresh, _))| (k, thresh)));
1086
1087                    let newly_crossed = passed.anti_join(already_crossed.clone());
1088                    already_crossed =
1089                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1090
1091                    newly_crossed.into_keyed()
1092                };
1093
1094                KeyedStream::new(
1095                    self_location,
1096                    result.ir_node.replace(HydroNode::Placeholder),
1097                )
1098            }
1099            _ => {
1100                unreachable!(
1101                    "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1102                )
1103            }
1104        }
1105    }
1106
1107    /// Like [`Self::threshold_greater_or_equal`], but uses a single [`Singleton`] threshold
1108    /// shared across all keys. Emits a `(K, V)` event for each key the first time that key's
1109    /// value becomes >= the threshold. The emitted value is the threshold itself.
1110    ///
1111    /// Because the threshold is a [`Bounded`] singleton, it is a compile-time constant and
1112    /// does not carry ongoing memory cost.
1113    ///
1114    /// # Example
1115    /// ```rust,ignore
1116    /// use hydro_lang::prelude::*;
1117    ///
1118    /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
1119    ///     .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 */)));
1120    ///
1121    /// let threshold = process.singleton(q!(5usize));
1122    /// let crossed = counts.threshold_greater_or_equal_uniform(threshold);
1123    /// ```
1124    pub fn threshold_greater_or_equal_uniform(
1125        self,
1126        threshold: Singleton<V, L, Bounded>,
1127    ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1128    where
1129        K: Clone + Eq + Hash,
1130        V: Clone + PartialOrd,
1131        B: IsKeyedMonotonic,
1132    {
1133        let self_location = self.location.clone();
1134        match B::bound_kind() {
1135            KeyedSingletonBoundKind::Bounded => {
1136                let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1137                    self.location.clone(),
1138                    self.ir_node.replace(HydroNode::Placeholder),
1139                );
1140                let result = me
1141                    .entries()
1142                    .cross_singleton(threshold)
1143                    .filter_map(q!(|((k, val), thresh)| {
1144                        if val >= thresh {
1145                            Some((k, thresh))
1146                        } else {
1147                            None
1148                        }
1149                    }))
1150                    .into_keyed();
1151                KeyedStream::new(
1152                    result.location.clone(),
1153                    result.ir_node.replace(HydroNode::Placeholder),
1154                )
1155            }
1156            KeyedSingletonBoundKind::MonotonicValue => {
1157                let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1158                    self.location.clone(),
1159                    self.ir_node.replace(HydroNode::Placeholder),
1160                );
1161
1162                let result = sliced! {
1163                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1164                    let mut already_crossed =
1165                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1166
1167                    let tick = snapshot.location().clone();
1168                    let thresh_in_tick = threshold.clone_into_tick(&tick);
1169
1170                    let crossing = snapshot
1171                        .entries()
1172                        .cross_singleton(thresh_in_tick)
1173                        .filter_map(q!(|((k, val), thresh)| {
1174                            if val >= thresh {
1175                                Some((k, thresh))
1176                            } else {
1177                                None
1178                            }
1179                        }));
1180
1181                    let newly_crossed = crossing.anti_join(already_crossed.clone());
1182                    already_crossed =
1183                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1184
1185                    newly_crossed.into_keyed()
1186                };
1187
1188                KeyedStream::new(
1189                    self_location,
1190                    result.ir_node.replace(HydroNode::Placeholder),
1191                )
1192            }
1193            KeyedSingletonBoundKind::BoundedValue => {
1194                let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1195                    self.location.clone(),
1196                    self.ir_node.replace(HydroNode::Placeholder),
1197                );
1198
1199                let result = sliced! {
1200                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1201                    let mut already_crossed =
1202                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1203
1204                    let tick = snapshot.location().clone();
1205                    let thresh_in_tick = threshold.clone_into_tick(&tick);
1206
1207                    let crossing = snapshot
1208                        .entries()
1209                        .cross_singleton(thresh_in_tick)
1210                        .filter_map(q!(|((k, val), thresh)| {
1211                            if val >= thresh {
1212                                Some((k, thresh))
1213                            } else {
1214                                None
1215                            }
1216                        }));
1217
1218                    let newly_crossed = crossing.anti_join(already_crossed.clone());
1219                    already_crossed =
1220                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1221
1222                    newly_crossed.into_keyed()
1223                };
1224
1225                KeyedStream::new(
1226                    self_location,
1227                    result.ir_node.replace(HydroNode::Placeholder),
1228                )
1229            }
1230            _ => {
1231                unreachable!(
1232                    "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1233                )
1234            }
1235        }
1236    }
1237}
1238
1239impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
1240    KeyedSingleton<K, V, L, B>
1241{
1242    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
1243    ///
1244    /// The value for each key must be bounded, otherwise the resulting stream elements would be
1245    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1246    /// into the output.
1247    ///
1248    /// # Example
1249    /// ```rust
1250    /// # #[cfg(feature = "deploy")] {
1251    /// # use hydro_lang::prelude::*;
1252    /// # use futures::StreamExt;
1253    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1254    /// let keyed_singleton = // { 1: 2, 2: 4 }
1255    /// # process
1256    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1257    /// #     .into_keyed()
1258    /// #     .first();
1259    /// keyed_singleton.entries()
1260    /// # }, |mut stream| async move {
1261    /// // (1, 2), (2, 4) in any order
1262    /// # let mut results = Vec::new();
1263    /// # for _ in 0..2 {
1264    /// #     results.push(stream.next().await.unwrap());
1265    /// # }
1266    /// # results.sort();
1267    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1268    /// # }));
1269    /// # }
1270    /// ```
1271    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1272        self.into_keyed_stream().entries()
1273    }
1274
1275    /// Flattens the keyed singleton into an unordered stream of just the values.
1276    ///
1277    /// The value for each key must be bounded, otherwise the resulting stream elements would be
1278    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1279    /// into the output.
1280    ///
1281    /// # Example
1282    /// ```rust
1283    /// # #[cfg(feature = "deploy")] {
1284    /// # use hydro_lang::prelude::*;
1285    /// # use futures::StreamExt;
1286    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1287    /// let keyed_singleton = // { 1: 2, 2: 4 }
1288    /// # process
1289    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1290    /// #     .into_keyed()
1291    /// #     .first();
1292    /// keyed_singleton.values()
1293    /// # }, |mut stream| async move {
1294    /// // 2, 4 in any order
1295    /// # let mut results = Vec::new();
1296    /// # for _ in 0..2 {
1297    /// #     results.push(stream.next().await.unwrap());
1298    /// # }
1299    /// # results.sort();
1300    /// # assert_eq!(results, vec![2, 4]);
1301    /// # }));
1302    /// # }
1303    /// ```
1304    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1305        let map_f = q!(|(_, v)| v)
1306            .splice_fn1_ctx::<(K, V), V>(&self.location)
1307            .into();
1308
1309        Stream::new(
1310            self.location.clone(),
1311            HydroNode::Map {
1312                f: map_f,
1313                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1314                metadata: self.location.new_node_metadata(Stream::<
1315                    V,
1316                    L,
1317                    B::UnderlyingBound,
1318                    NoOrder,
1319                    ExactlyOnce,
1320                >::collection_kind()),
1321            },
1322        )
1323    }
1324
1325    /// Flattens the keyed singleton into an unordered stream of just the keys.
1326    ///
1327    /// The value for each key must be bounded, otherwise the removal of keys would result in
1328    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1329    /// into the output.
1330    ///
1331    /// # Example
1332    /// ```rust
1333    /// # #[cfg(feature = "deploy")] {
1334    /// # use hydro_lang::prelude::*;
1335    /// # use futures::StreamExt;
1336    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1337    /// let keyed_singleton = // { 1: 2, 2: 4 }
1338    /// # process
1339    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1340    /// #     .into_keyed()
1341    /// #     .first();
1342    /// keyed_singleton.keys()
1343    /// # }, |mut stream| async move {
1344    /// // 1, 2 in any order
1345    /// # let mut results = Vec::new();
1346    /// # for _ in 0..2 {
1347    /// #     results.push(stream.next().await.unwrap());
1348    /// # }
1349    /// # results.sort();
1350    /// # assert_eq!(results, vec![1, 2]);
1351    /// # }));
1352    /// # }
1353    /// ```
1354    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1355        self.entries().map(q!(|(k, _)| k))
1356    }
1357
1358    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1359    /// entries whose keys are not in the provided stream.
1360    ///
1361    /// # Example
1362    /// ```rust
1363    /// # #[cfg(feature = "deploy")] {
1364    /// # use hydro_lang::prelude::*;
1365    /// # use futures::StreamExt;
1366    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1367    /// let tick = process.tick();
1368    /// let keyed_singleton = // { 1: 2, 2: 4 }
1369    /// # process
1370    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1371    /// #     .into_keyed()
1372    /// #     .first()
1373    /// #     .batch(&tick, nondet!(/** test */));
1374    /// let keys_to_remove = process
1375    ///     .source_iter(q!(vec![1]))
1376    ///     .batch(&tick, nondet!(/** test */));
1377    /// keyed_singleton.filter_key_not_in(keys_to_remove)
1378    /// #   .entries().all_ticks()
1379    /// # }, |mut stream| async move {
1380    /// // { 2: 4 }
1381    /// # for w in vec![(2, 4)] {
1382    /// #     assert_eq!(stream.next().await.unwrap(), w);
1383    /// # }
1384    /// # }));
1385    /// # }
1386    /// ```
1387    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1388        self,
1389        other: Stream<K, L, Bounded, O2, R2>,
1390    ) -> Self
1391    where
1392        K: Hash + Eq,
1393    {
1394        check_matching_location(&self.location, &other.location);
1395
1396        KeyedSingleton::new(
1397            self.location.clone(),
1398            HydroNode::AntiJoin {
1399                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1400                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1401                metadata: self.location.new_node_metadata(Self::collection_kind()),
1402            },
1403        )
1404    }
1405
1406    /// An operator which allows you to "inspect" each value of a keyed singleton without
1407    /// modifying it. The closure `f` is called on a reference to each value. This is
1408    /// mainly useful for debugging, and should not be used to generate side-effects.
1409    ///
1410    /// # Example
1411    /// ```rust
1412    /// # #[cfg(feature = "deploy")] {
1413    /// # use hydro_lang::prelude::*;
1414    /// # use futures::StreamExt;
1415    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1416    /// let keyed_singleton = // { 1: 2, 2: 4 }
1417    /// # process
1418    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1419    /// #     .into_keyed()
1420    /// #     .first();
1421    /// keyed_singleton
1422    ///     .inspect(q!(|v| println!("{}", v)))
1423    /// #   .entries()
1424    /// # }, |mut stream| async move {
1425    /// // { 1: 2, 2: 4 }
1426    /// # for w in vec![(1, 2), (2, 4)] {
1427    /// #     assert_eq!(stream.next().await.unwrap(), w);
1428    /// # }
1429    /// # }));
1430    /// # }
1431    /// ```
1432    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1433    where
1434        F: Fn(&V) + 'a,
1435    {
1436        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1437        let inspect_f = q!({
1438            let orig = f;
1439            move |t: &(_, _)| orig(&t.1)
1440        })
1441        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1442        .into();
1443
1444        KeyedSingleton::new(
1445            self.location.clone(),
1446            HydroNode::Inspect {
1447                f: inspect_f,
1448                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1449                metadata: self.location.new_node_metadata(Self::collection_kind()),
1450            },
1451        )
1452    }
1453
1454    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1455    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1456    /// mainly useful for debugging, and should not be used to generate side-effects.
1457    ///
1458    /// # Example
1459    /// ```rust
1460    /// # #[cfg(feature = "deploy")] {
1461    /// # use hydro_lang::prelude::*;
1462    /// # use futures::StreamExt;
1463    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1464    /// let keyed_singleton = // { 1: 2, 2: 4 }
1465    /// # process
1466    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1467    /// #     .into_keyed()
1468    /// #     .first();
1469    /// keyed_singleton
1470    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1471    /// #   .entries()
1472    /// # }, |mut stream| async move {
1473    /// // { 1: 2, 2: 4 }
1474    /// # for w in vec![(1, 2), (2, 4)] {
1475    /// #     assert_eq!(stream.next().await.unwrap(), w);
1476    /// # }
1477    /// # }));
1478    /// # }
1479    /// ```
1480    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1481    where
1482        F: Fn(&(K, V)) + 'a,
1483    {
1484        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1485
1486        KeyedSingleton::new(
1487            self.location.clone(),
1488            HydroNode::Inspect {
1489                f: inspect_f,
1490                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1491                metadata: self.location.new_node_metadata(Self::collection_kind()),
1492            },
1493        )
1494    }
1495
1496    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1497    ///
1498    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1499    /// asynchronously updated if a new key is added that is higher than the previous max key.
1500    ///
1501    /// # Example
1502    /// ```rust
1503    /// # #[cfg(feature = "deploy")] {
1504    /// # use hydro_lang::prelude::*;
1505    /// # use futures::StreamExt;
1506    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1507    /// let tick = process.tick();
1508    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1509    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1510    /// #     .into_keyed()
1511    /// #     .first();
1512    /// keyed_singleton.get_max_key()
1513    /// # .sample_eager(nondet!(/** test */))
1514    /// # }, |mut stream| async move {
1515    /// // (2, 456)
1516    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1517    /// # }));
1518    /// # }
1519    /// ```
1520    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1521    where
1522        K: Ord,
1523    {
1524        self.entries()
1525            .assume_ordering_trusted(nondet!(
1526                /// There is only one element associated with each key, and the keys are totallly
1527                /// ordered so we will produce a deterministic value. The closure technically
1528                /// isn't commutative in the case where both passed entries have the same key
1529                /// but different values.
1530                ///
1531                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1532                /// the two inputs do not have the same key.
1533            ))
1534            .reduce(q!(
1535                move |curr, new| {
1536                    if new.0 > curr.0 {
1537                        *curr = new;
1538                    }
1539                },
1540                idempotent = manual_proof!(/** repeated elements are ignored */)
1541            ))
1542    }
1543
1544    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1545    /// element, the value.
1546    ///
1547    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1548    ///
1549    /// # Example
1550    /// ```rust
1551    /// # #[cfg(feature = "deploy")] {
1552    /// # use hydro_lang::prelude::*;
1553    /// # use futures::StreamExt;
1554    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1555    /// let keyed_singleton = // { 1: 2, 2: 4 }
1556    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1557    /// #     .into_keyed()
1558    /// #     .first();
1559    /// keyed_singleton
1560    ///     .clone()
1561    ///     .into_keyed_stream()
1562    ///     .merge_unordered(
1563    ///         keyed_singleton.into_keyed_stream()
1564    ///     )
1565    /// #   .entries()
1566    /// # }, |mut stream| async move {
1567    /// /// // { 1: [2, 2], 2: [4, 4] }
1568    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1569    /// #     assert_eq!(stream.next().await.unwrap(), w);
1570    /// # }
1571    /// # }));
1572    /// # }
1573    /// ```
1574    pub fn into_keyed_stream(
1575        self,
1576    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1577        KeyedStream::new(
1578            self.location.clone(),
1579            HydroNode::Cast {
1580                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1581                metadata: self.location.new_node_metadata(KeyedStream::<
1582                    K,
1583                    V,
1584                    L,
1585                    B::UnderlyingBound,
1586                    TotalOrder,
1587                    ExactlyOnce,
1588                >::collection_kind()),
1589            },
1590        )
1591    }
1592}
1593
1594impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1595where
1596    L: Location<'a>,
1597{
1598    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1599    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1600    ///
1601    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1602    /// processed before an acknowledgement is emitted.
1603    pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1604        let id = self.location.flow_state().borrow_mut().next_clock_id();
1605        let out_location = Atomic {
1606            tick: Tick {
1607                id,
1608                l: self.location.clone(),
1609            },
1610        };
1611        KeyedSingleton::new(
1612            out_location.clone(),
1613            HydroNode::BeginAtomic {
1614                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1615                metadata: out_location
1616                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1617            },
1618        )
1619    }
1620}
1621
1622impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1623where
1624    L: Location<'a>,
1625{
1626    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1627    /// See [`KeyedSingleton::atomic`] for more details.
1628    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1629        KeyedSingleton::new(
1630            self.location.tick.l.clone(),
1631            HydroNode::EndAtomic {
1632                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1633                metadata: self
1634                    .location
1635                    .tick
1636                    .l
1637                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1638            },
1639        )
1640    }
1641}
1642
1643impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1644    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1645    /// tick `T` always has the entries of `self` at tick `T - 1`.
1646    ///
1647    /// At tick `0`, the output has no entries, since there is no previous tick.
1648    ///
1649    /// This operator enables stateful iterative processing with ticks, by sending data from one
1650    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1651    ///
1652    /// # Example
1653    /// ```rust
1654    /// # #[cfg(feature = "deploy")] {
1655    /// # use hydro_lang::prelude::*;
1656    /// # use futures::StreamExt;
1657    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1658    /// let tick = process.tick();
1659    /// # // ticks are lazy by default, forces the second tick to run
1660    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1661    /// # let batch_first_tick = process
1662    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1663    /// #   .batch(&tick, nondet!(/** test */))
1664    /// #   .into_keyed();
1665    /// # let batch_second_tick = process
1666    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1667    /// #   .batch(&tick, nondet!(/** test */))
1668    /// #   .into_keyed()
1669    /// #   .defer_tick(); // appears on the second tick
1670    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1671    /// # batch_first_tick.chain(batch_second_tick).first();
1672    /// input_batch.clone().filter_key_not_in(
1673    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1674    /// )
1675    /// # .entries().all_ticks()
1676    /// # }, |mut stream| async move {
1677    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1678    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1679    /// #     assert_eq!(stream.next().await.unwrap(), w);
1680    /// # }
1681    /// # }));
1682    /// # }
1683    /// ```
1684    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1685        KeyedSingleton::new(
1686            self.location.clone(),
1687            HydroNode::DeferTick {
1688                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1689                metadata: self
1690                    .location
1691                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1692            },
1693        )
1694    }
1695}
1696
1697impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1698where
1699    L: Location<'a>,
1700{
1701    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1702    /// point in time.
1703    ///
1704    /// # Non-Determinism
1705    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1706    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1707    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1708        self,
1709        tick: &Tick<L2>,
1710        _nondet: NonDet,
1711    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1712        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1713        KeyedSingleton::new(
1714            tick.drop_consistency(),
1715            HydroNode::Batch {
1716                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1717                metadata: tick
1718                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1719            },
1720        )
1721    }
1722}
1723
1724impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1725where
1726    L: Location<'a>,
1727{
1728    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1729    /// state of the keyed singleton being atomically processed.
1730    ///
1731    /// # Non-Determinism
1732    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1733    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1734    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1735        self,
1736        tick: &Tick<L2>,
1737        _nondet: NonDet,
1738    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1739        KeyedSingleton::new(
1740            tick.drop_consistency(),
1741            HydroNode::Batch {
1742                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1743                metadata: tick
1744                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1745            },
1746        )
1747    }
1748}
1749
1750impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1751where
1752    L: Location<'a>,
1753{
1754    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1755    ///
1756    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1757    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1758    /// is filtered out.
1759    ///
1760    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1761    /// not modify or take ownership of the values. If you need to modify the values while filtering
1762    /// use [`KeyedSingleton::filter_map`] instead.
1763    ///
1764    /// # Example
1765    /// ```rust
1766    /// # #[cfg(feature = "deploy")] {
1767    /// # use hydro_lang::prelude::*;
1768    /// # use futures::StreamExt;
1769    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1770    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1771    /// # process
1772    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1773    /// #     .into_keyed()
1774    /// #     .first();
1775    /// keyed_singleton.filter(q!(|&v| v > 1))
1776    /// #   .entries()
1777    /// # }, |mut stream| async move {
1778    /// // { 1: 2, 2: 4 }
1779    /// # let mut results = Vec::new();
1780    /// # for _ in 0..2 {
1781    /// #     results.push(stream.next().await.unwrap());
1782    /// # }
1783    /// # results.sort();
1784    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1785    /// # }));
1786    /// # }
1787    /// ```
1788    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1789    where
1790        F: Fn(&V) -> bool + 'a,
1791    {
1792        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1793        let filter_f = q!({
1794            let orig = f;
1795            move |t: &(_, _)| orig(&t.1)
1796        })
1797        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1798        .into();
1799
1800        KeyedSingleton::new(
1801            self.location.clone(),
1802            HydroNode::Filter {
1803                f: filter_f,
1804                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1805                metadata: self
1806                    .location
1807                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1808            },
1809        )
1810    }
1811
1812    /// An operator that both filters and maps values. It yields only the key-value pairs where
1813    /// the supplied closure `f` returns `Some(value)`.
1814    ///
1815    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1816    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1817    /// If it returns `None`, the key-value pair is filtered out.
1818    ///
1819    /// # Example
1820    /// ```rust
1821    /// # #[cfg(feature = "deploy")] {
1822    /// # use hydro_lang::prelude::*;
1823    /// # use futures::StreamExt;
1824    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1825    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1826    /// # process
1827    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1828    /// #     .into_keyed()
1829    /// #     .first();
1830    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1831    /// #   .entries()
1832    /// # }, |mut stream| async move {
1833    /// // { 1: 42, 3: 100 }
1834    /// # let mut results = Vec::new();
1835    /// # for _ in 0..2 {
1836    /// #     results.push(stream.next().await.unwrap());
1837    /// # }
1838    /// # results.sort();
1839    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1840    /// # }));
1841    /// # }
1842    /// ```
1843    pub fn filter_map<F, U>(
1844        self,
1845        f: impl IntoQuotedMut<'a, F, L> + Copy,
1846    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1847    where
1848        F: Fn(V) -> Option<U> + 'a,
1849    {
1850        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1851        let filter_map_f = q!({
1852            let orig = f;
1853            move |(k, v)| orig(v).map(|o| (k, o))
1854        })
1855        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1856        .into();
1857
1858        KeyedSingleton::new(
1859            self.location.clone(),
1860            HydroNode::FilterMap {
1861                f: filter_map_f,
1862                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1863                metadata: self.location.new_node_metadata(KeyedSingleton::<
1864                    K,
1865                    U,
1866                    L,
1867                    B::EraseMonotonic,
1868                >::collection_kind()),
1869            },
1870        )
1871    }
1872
1873    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1874    /// arrived since the previous batch was released.
1875    ///
1876    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1877    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1878    ///
1879    /// # Non-Determinism
1880    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1881    /// has a non-deterministic set of key-value pairs.
1882    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1883        self,
1884        tick: &Tick<L2>,
1885        _nondet: NonDet,
1886    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1887        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1888        KeyedSingleton::new(
1889            tick.drop_consistency(),
1890            HydroNode::Batch {
1891                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1892                metadata: tick
1893                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1894            },
1895        )
1896    }
1897}
1898
1899impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1900where
1901    L: Location<'a>,
1902{
1903    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1904    /// atomically processed.
1905    ///
1906    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1907    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1908    ///
1909    /// # Non-Determinism
1910    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1911    /// has a non-deterministic set of key-value pairs.
1912    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1913        self,
1914        tick: &Tick<L2>,
1915        nondet: NonDet,
1916    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1917        let _ = nondet;
1918        KeyedSingleton::new(
1919            tick.drop_consistency(),
1920            HydroNode::Batch {
1921                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1922                metadata: tick
1923                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1924            },
1925        )
1926    }
1927}
1928
1929#[cfg(test)]
1930mod tests {
1931    #[cfg(feature = "deploy")]
1932    use futures::{SinkExt, StreamExt};
1933    #[cfg(feature = "deploy")]
1934    use hydro_deploy::Deployment;
1935    #[cfg(any(feature = "deploy", feature = "sim"))]
1936    use stageleft::q;
1937
1938    #[cfg(any(feature = "deploy", feature = "sim"))]
1939    use crate::compile::builder::FlowBuilder;
1940    #[cfg(any(feature = "deploy", feature = "sim"))]
1941    use crate::location::Location;
1942    #[cfg(any(feature = "deploy", feature = "sim"))]
1943    use crate::nondet::nondet;
1944
1945    #[cfg(feature = "deploy")]
1946    #[tokio::test]
1947    async fn key_count_bounded_value() {
1948        let mut deployment = Deployment::new();
1949
1950        let mut flow = FlowBuilder::new();
1951        let node = flow.process::<()>();
1952        let external = flow.external::<()>();
1953
1954        let (input_port, input) = node.source_external_bincode(&external);
1955        let out = input
1956            .into_keyed()
1957            .first()
1958            .key_count()
1959            .sample_eager(nondet!(/** test */))
1960            .send_bincode_external(&external);
1961
1962        let nodes = flow
1963            .with_process(&node, deployment.Localhost())
1964            .with_external(&external, deployment.Localhost())
1965            .deploy(&mut deployment);
1966
1967        deployment.deploy().await.unwrap();
1968
1969        let mut external_in = nodes.connect(input_port).await;
1970        let mut external_out = nodes.connect(out).await;
1971
1972        deployment.start().await.unwrap();
1973
1974        assert_eq!(external_out.next().await.unwrap(), 0);
1975
1976        external_in.send((1, 1)).await.unwrap();
1977        assert_eq!(external_out.next().await.unwrap(), 1);
1978
1979        external_in.send((2, 2)).await.unwrap();
1980        assert_eq!(external_out.next().await.unwrap(), 2);
1981    }
1982
1983    #[cfg(feature = "deploy")]
1984    #[tokio::test]
1985    async fn key_count_unbounded_value() {
1986        let mut deployment = Deployment::new();
1987
1988        let mut flow = FlowBuilder::new();
1989        let node = flow.process::<()>();
1990        let external = flow.external::<()>();
1991
1992        let (input_port, input) = node.source_external_bincode(&external);
1993        let out = input
1994            .into_keyed()
1995            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1996            .key_count()
1997            .sample_eager(nondet!(/** test */))
1998            .send_bincode_external(&external);
1999
2000        let nodes = flow
2001            .with_process(&node, deployment.Localhost())
2002            .with_external(&external, deployment.Localhost())
2003            .deploy(&mut deployment);
2004
2005        deployment.deploy().await.unwrap();
2006
2007        let mut external_in = nodes.connect(input_port).await;
2008        let mut external_out = nodes.connect(out).await;
2009
2010        deployment.start().await.unwrap();
2011
2012        assert_eq!(external_out.next().await.unwrap(), 0);
2013
2014        external_in.send((1, 1)).await.unwrap();
2015        assert_eq!(external_out.next().await.unwrap(), 1);
2016
2017        external_in.send((1, 2)).await.unwrap();
2018        assert_eq!(external_out.next().await.unwrap(), 1);
2019
2020        external_in.send((2, 2)).await.unwrap();
2021        assert_eq!(external_out.next().await.unwrap(), 2);
2022
2023        external_in.send((1, 1)).await.unwrap();
2024        assert_eq!(external_out.next().await.unwrap(), 2);
2025
2026        external_in.send((3, 1)).await.unwrap();
2027        assert_eq!(external_out.next().await.unwrap(), 3);
2028    }
2029
2030    #[cfg(feature = "deploy")]
2031    #[tokio::test]
2032    async fn into_singleton_bounded_value() {
2033        let mut deployment = Deployment::new();
2034
2035        let mut flow = FlowBuilder::new();
2036        let node = flow.process::<()>();
2037        let external = flow.external::<()>();
2038
2039        let (input_port, input) = node.source_external_bincode(&external);
2040        let out = input
2041            .into_keyed()
2042            .first()
2043            .into_singleton()
2044            .sample_eager(nondet!(/** test */))
2045            .send_bincode_external(&external);
2046
2047        let nodes = flow
2048            .with_process(&node, deployment.Localhost())
2049            .with_external(&external, deployment.Localhost())
2050            .deploy(&mut deployment);
2051
2052        deployment.deploy().await.unwrap();
2053
2054        let mut external_in = nodes.connect(input_port).await;
2055        let mut external_out = nodes.connect(out).await;
2056
2057        deployment.start().await.unwrap();
2058
2059        assert_eq!(
2060            external_out.next().await.unwrap(),
2061            std::collections::HashMap::new()
2062        );
2063
2064        external_in.send((1, 1)).await.unwrap();
2065        assert_eq!(
2066            external_out.next().await.unwrap(),
2067            vec![(1, 1)].into_iter().collect()
2068        );
2069
2070        external_in.send((2, 2)).await.unwrap();
2071        assert_eq!(
2072            external_out.next().await.unwrap(),
2073            vec![(1, 1), (2, 2)].into_iter().collect()
2074        );
2075    }
2076
2077    #[cfg(feature = "deploy")]
2078    #[tokio::test]
2079    async fn into_singleton_unbounded_value() {
2080        let mut deployment = Deployment::new();
2081
2082        let mut flow = FlowBuilder::new();
2083        let node = flow.process::<()>();
2084        let external = flow.external::<()>();
2085
2086        let (input_port, input) = node.source_external_bincode(&external);
2087        let out = input
2088            .into_keyed()
2089            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2090            .into_singleton()
2091            .sample_eager(nondet!(/** test */))
2092            .send_bincode_external(&external);
2093
2094        let nodes = flow
2095            .with_process(&node, deployment.Localhost())
2096            .with_external(&external, deployment.Localhost())
2097            .deploy(&mut deployment);
2098
2099        deployment.deploy().await.unwrap();
2100
2101        let mut external_in = nodes.connect(input_port).await;
2102        let mut external_out = nodes.connect(out).await;
2103
2104        deployment.start().await.unwrap();
2105
2106        assert_eq!(
2107            external_out.next().await.unwrap(),
2108            std::collections::HashMap::new()
2109        );
2110
2111        external_in.send((1, 1)).await.unwrap();
2112        assert_eq!(
2113            external_out.next().await.unwrap(),
2114            vec![(1, 1)].into_iter().collect()
2115        );
2116
2117        external_in.send((1, 2)).await.unwrap();
2118        assert_eq!(
2119            external_out.next().await.unwrap(),
2120            vec![(1, 2)].into_iter().collect()
2121        );
2122
2123        external_in.send((2, 2)).await.unwrap();
2124        assert_eq!(
2125            external_out.next().await.unwrap(),
2126            vec![(1, 2), (2, 1)].into_iter().collect()
2127        );
2128
2129        external_in.send((1, 1)).await.unwrap();
2130        assert_eq!(
2131            external_out.next().await.unwrap(),
2132            vec![(1, 3), (2, 1)].into_iter().collect()
2133        );
2134
2135        external_in.send((3, 1)).await.unwrap();
2136        assert_eq!(
2137            external_out.next().await.unwrap(),
2138            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
2139        );
2140    }
2141
2142    #[cfg(feature = "sim")]
2143    #[test]
2144    fn sim_unbounded_singleton_snapshot() {
2145        let mut flow = FlowBuilder::new();
2146        let node = flow.process::<()>();
2147
2148        let (input_port, input) = node.sim_input();
2149        let output = input
2150            .into_keyed()
2151            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2152            .snapshot(&node.tick(), nondet!(/** test */))
2153            .entries()
2154            .all_ticks()
2155            .sim_output();
2156
2157        let count = flow.sim().exhaustive(async || {
2158            input_port.send((1, 123));
2159            input_port.send((1, 456));
2160            input_port.send((2, 123));
2161
2162            let all = output.collect_sorted::<Vec<_>>().await;
2163            assert_eq!(all.last().unwrap(), &(2, 1));
2164        });
2165
2166        assert_eq!(count, 8);
2167    }
2168
2169    #[cfg(feature = "deploy")]
2170    #[tokio::test]
2171    async fn join_keyed_stream() {
2172        let mut deployment = Deployment::new();
2173
2174        let mut flow = FlowBuilder::new();
2175        let node = flow.process::<()>();
2176        let external = flow.external::<()>();
2177
2178        let tick = node.tick();
2179        let keyed_data = node
2180            .source_iter(q!(vec![(1, 10), (2, 20)]))
2181            .into_keyed()
2182            .batch(&tick, nondet!(/** test */))
2183            .first();
2184        let requests = node
2185            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
2186            .into_keyed()
2187            .batch(&tick, nondet!(/** test */));
2188
2189        let out = keyed_data
2190            .join_keyed_stream(requests)
2191            .entries()
2192            .all_ticks()
2193            .send_bincode_external(&external);
2194
2195        let nodes = flow
2196            .with_process(&node, deployment.Localhost())
2197            .with_external(&external, deployment.Localhost())
2198            .deploy(&mut deployment);
2199
2200        deployment.deploy().await.unwrap();
2201
2202        let mut external_out = nodes.connect(out).await;
2203
2204        deployment.start().await.unwrap();
2205
2206        let mut results = vec![];
2207        for _ in 0..2 {
2208            results.push(external_out.next().await.unwrap());
2209        }
2210        results.sort();
2211
2212        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
2213    }
2214
2215    #[cfg(feature = "sim")]
2216    #[test]
2217    fn threshold_greater_or_equal_monotonic() {
2218        let mut flow = FlowBuilder::new();
2219        let node = flow.process::<()>();
2220
2221        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2222        let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2223
2224        // Create a monotonically increasing keyed singleton via fold with monotone proof
2225        let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2226            input.into_keyed().fold(
2227                q!(|| 0usize),
2228                q!(
2229                    |acc, v| *acc += v,
2230                    monotone = crate::properties::manual_proof!(/** += is monotonic */)
2231                ),
2232            );
2233
2234        // BoundedValue keyed singleton of thresholds (from .first() on unbounded stream)
2235        let thresholds = thresh_input.into_keyed().first();
2236
2237        let output = counts
2238            .threshold_greater_or_equal(thresholds)
2239            .entries()
2240            .sim_output();
2241
2242        let count = flow.sim().exhaustive(async || {
2243            // Set thresholds: key 1 needs value >= 5, key 2 needs value >= 10
2244            thresh_port.send((1, 5));
2245            thresh_port.send((2, 10));
2246
2247            // key 1 gets increments: 3 + 3 = 6, which is >= 5 ✓
2248            input_port.send((1, 3));
2249            input_port.send((1, 3));
2250            // key 2 gets increments: 3 + 3 = 6, which is < 10 ✗
2251            input_port.send((2, 3));
2252            input_port.send((2, 3));
2253
2254            let results = output.collect_sorted::<Vec<_>>().await;
2255            assert_eq!(results, vec![(1, 5)]);
2256        });
2257
2258        assert!(count > 0);
2259    }
2260
2261    #[cfg(feature = "sim")]
2262    #[test]
2263    fn threshold_greater_or_equal_uniform() {
2264        let mut flow = FlowBuilder::new();
2265        let node = flow.process::<()>();
2266
2267        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2268
2269        let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2270            input.into_keyed().fold(
2271                q!(|| 0usize),
2272                q!(
2273                    |acc, v| *acc += v,
2274                    monotone = crate::properties::manual_proof!(/** += is monotonic */)
2275                ),
2276            );
2277
2278        // Uniform threshold: all keys need value >= 5
2279        let threshold = node.singleton(q!(5usize));
2280
2281        let output = counts
2282            .threshold_greater_or_equal_uniform(threshold)
2283            .entries()
2284            .sim_output();
2285
2286        let count = flow.sim().exhaustive(async || {
2287            // key 1: 3 + 3 = 6 >= 5 ✓
2288            input_port.send((1, 3));
2289            input_port.send((1, 3));
2290            // key 2: 2 + 2 = 4 < 5 ✗
2291            input_port.send((2, 2));
2292            input_port.send((2, 2));
2293
2294            let results = output.collect_sorted::<Vec<_>>().await;
2295            assert_eq!(results, vec![(1, 5)]);
2296        });
2297
2298        assert!(count > 0);
2299    }
2300
2301    #[cfg(feature = "sim")]
2302    #[test]
2303    fn threshold_greater_or_equal_bounded_value() {
2304        let mut flow = FlowBuilder::new();
2305        let node = flow.process::<()>();
2306
2307        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2308        let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2309
2310        // BoundedValue keyed singleton (values fixed once per key via .first())
2311        let values = input.into_keyed().first();
2312
2313        // BoundedValue keyed singleton of thresholds
2314        let thresholds = thresh_input.into_keyed().first();
2315
2316        let output = values
2317            .threshold_greater_or_equal(thresholds)
2318            .entries()
2319            .sim_output();
2320
2321        let count = flow.sim().exhaustive(async || {
2322            // Set thresholds: key 1 needs >= 3, key 2 needs >= 10
2323            thresh_port.send((1, 3));
2324            thresh_port.send((2, 10));
2325
2326            // key 1 gets value 5 >= 3 ✓, key 2 gets value 4 < 10 ✗
2327            input_port.send((1, 5));
2328            input_port.send((2, 4));
2329
2330            let results = output.collect_sorted::<Vec<_>>().await;
2331            assert_eq!(results, vec![(1, 3)]);
2332        });
2333
2334        assert!(count > 0);
2335    }
2336
2337    #[cfg(feature = "sim")]
2338    #[test]
2339    fn threshold_greater_or_equal_uniform_bounded_value() {
2340        let mut flow = FlowBuilder::new();
2341        let node = flow.process::<()>();
2342
2343        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2344
2345        // BoundedValue keyed singleton (values fixed once per key via .first())
2346        let values = input.into_keyed().first();
2347
2348        // Uniform threshold: all keys need value >= 5
2349        let threshold = node.singleton(q!(5usize));
2350
2351        let output = values
2352            .threshold_greater_or_equal_uniform(threshold)
2353            .entries()
2354            .sim_output();
2355
2356        let count = flow.sim().exhaustive(async || {
2357            // key 1 gets value 7 >= 5 ✓, key 2 gets value 3 < 5 ✗
2358            input_port.send((1, 7));
2359            input_port.send((2, 3));
2360
2361            let results = output.collect_sorted::<Vec<_>>().await;
2362            assert_eq!(results, vec![(1, 5)]);
2363        });
2364
2365        assert!(count > 0);
2366    }
2367
2368    #[cfg(feature = "sim")]
2369    #[test]
2370    fn threshold_greater_or_equal_bounded() {
2371        let mut flow = FlowBuilder::new();
2372        let node = flow.process::<()>();
2373
2374        // Bounded keyed singleton (fully known upfront)
2375        let values = node
2376            .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2377            .into_keyed()
2378            .first();
2379
2380        // BoundedValue thresholds (from async source)
2381        let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2382        let thresholds = thresh_input.into_keyed().first();
2383
2384        let output = values
2385            .threshold_greater_or_equal(thresholds)
2386            .entries()
2387            .sim_output();
2388
2389        let count = flow.sim().exhaustive(async || {
2390            thresh_port.send((1, 5));
2391            thresh_port.send((2, 10));
2392
2393            // key 1: 6 >= 5 ✓, key 2: 4 < 10 ✗
2394            let results = output.collect_sorted::<Vec<_>>().await;
2395            assert_eq!(results, vec![(1, 5)]);
2396        });
2397
2398        assert!(count > 0);
2399    }
2400
2401    #[cfg(feature = "sim")]
2402    #[test]
2403    fn threshold_greater_or_equal_uniform_bounded() {
2404        let mut flow = FlowBuilder::new();
2405        let node = flow.process::<()>();
2406
2407        let values = node
2408            .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2409            .into_keyed()
2410            .first();
2411        let threshold = node.singleton(q!(5usize));
2412
2413        let output = values
2414            .threshold_greater_or_equal_uniform(threshold)
2415            .entries()
2416            .sim_output();
2417
2418        let count = flow.sim().exhaustive(async || {
2419            // key 1: 6 >= 5 ✓, key 2: 4 < 5 ✗
2420            let results = output.collect_sorted::<Vec<_>>().await;
2421            assert_eq!(results, vec![(1, 5)]);
2422        });
2423
2424        assert!(count > 0);
2425    }
2426}