Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
36    ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
37    ValidMutCommutativityFor, ValidMutIdempotenceFor,
38};
39
40pub mod networking;
41
42/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
43#[sealed::sealed]
44pub trait Ordering:
45    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
46{
47    /// The [`StreamOrder`] corresponding to this type.
48    const ORDERING_KIND: StreamOrder;
49}
50
51/// Marks the stream as being totally ordered, which means that there are
52/// no sources of non-determinism (other than intentional ones) that will
53/// affect the order of elements.
54pub enum TotalOrder {}
55
56#[sealed::sealed]
57impl Ordering for TotalOrder {
58    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
59}
60
61/// Marks the stream as having no order, which means that the order of
62/// elements may be affected by non-determinism.
63///
64/// This restricts certain operators, such as `fold` and `reduce`, to only
65/// be used with commutative aggregation functions.
66pub enum NoOrder {}
67
68#[sealed::sealed]
69impl Ordering for NoOrder {
70    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
71}
72
73/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
74/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
75/// have `Self` guarantees instead.
76#[sealed::sealed]
77pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
78#[sealed::sealed]
79impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
80
81/// Helper trait for determining the weakest of two orderings.
82#[sealed::sealed]
83pub trait MinOrder<Other: ?Sized> {
84    /// The weaker of the two orderings.
85    type Min: Ordering;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for TotalOrder {
90    type Min = O;
91}
92
93#[sealed::sealed]
94impl<O: Ordering> MinOrder<O> for NoOrder {
95    type Min = NoOrder;
96}
97
98/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
99#[sealed::sealed]
100pub trait Retries:
101    MinRetries<Self, Min = Self>
102    + MinRetries<ExactlyOnce, Min = Self>
103    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
104{
105    /// The [`StreamRetry`] corresponding to this type.
106    const RETRIES_KIND: StreamRetry;
107}
108
109/// Marks the stream as having deterministic message cardinality, with no
110/// possibility of duplicates.
111pub enum ExactlyOnce {}
112
113#[sealed::sealed]
114impl Retries for ExactlyOnce {
115    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
116}
117
118/// Marks the stream as having non-deterministic message cardinality, which
119/// means that duplicates may occur, but messages will not be dropped.
120pub enum AtLeastOnce {}
121
122#[sealed::sealed]
123impl Retries for AtLeastOnce {
124    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
125}
126
127/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
128/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
129/// have `Self` guarantees instead.
130#[sealed::sealed]
131pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
132#[sealed::sealed]
133impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
134
135/// Helper trait for determining the weakest of two retry guarantees.
136#[sealed::sealed]
137pub trait MinRetries<Other: ?Sized> {
138    /// The weaker of the two retry guarantees.
139    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for ExactlyOnce {
144    type Min = R;
145}
146
147#[sealed::sealed]
148impl<R: Retries> MinRetries<R> for AtLeastOnce {
149    type Min = AtLeastOnce;
150}
151
152#[sealed::sealed]
153#[diagnostic::on_unimplemented(
154    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
155    label = "required here",
156    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
157)]
158/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
159pub trait IsOrdered: Ordering {}
160
161#[sealed::sealed]
162#[diagnostic::do_not_recommend]
163impl IsOrdered for TotalOrder {}
164
165#[sealed::sealed]
166#[diagnostic::on_unimplemented(
167    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
168    label = "required here",
169    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
170)]
171/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
172pub trait IsExactlyOnce: Retries {}
173
174#[sealed::sealed]
175#[diagnostic::do_not_recommend]
176impl IsExactlyOnce for ExactlyOnce {}
177
178/// Streaming sequence of elements with type `Type`.
179///
180/// This live collection represents a growing sequence of elements, with new elements being
181/// asynchronously appended to the end of the sequence. This can be used to model the arrival
182/// of network input, such as API requests, or streaming ingestion.
183///
184/// By default, all streams have deterministic ordering and each element is materialized exactly
185/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
186/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
187/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
188///
189/// Type Parameters:
190/// - `Type`: the type of elements in the stream
191/// - `Loc`: the location where the stream is being materialized
192/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
193/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
194///   (default is [`TotalOrder`])
195/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
196///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
197pub struct Stream<
198    Type,
199    Loc,
200    Bound: Boundedness = Unbounded,
201    Order: Ordering = TotalOrder,
202    Retry: Retries = ExactlyOnce,
203> {
204    pub(crate) location: Loc,
205    pub(crate) ir_node: RefCell<HydroNode>,
206    pub(crate) flow_state: FlowState,
207
208    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
209}
210
211impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
212    fn drop(&mut self) {
213        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
214        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
215            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
216                input: Box::new(ir_node),
217                op_metadata: HydroIrOpMetadata::new(),
218            });
219        }
220    }
221}
222
223impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
224    for Stream<T, L, Unbounded, O, R>
225where
226    L: Location<'a>,
227{
228    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
229        let new_meta = stream
230            .location
231            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
232
233        Stream {
234            location: stream.location.clone(),
235            flow_state: stream.flow_state.clone(),
236            ir_node: RefCell::new(HydroNode::Cast {
237                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
238                metadata: new_meta,
239            }),
240            _phantom: PhantomData,
241        }
242    }
243}
244
245impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
246    for Stream<T, L, B, NoOrder, R>
247where
248    L: Location<'a>,
249{
250    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
251        stream.weaken_ordering()
252    }
253}
254
255impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
256    for Stream<T, L, B, O, AtLeastOnce>
257where
258    L: Location<'a>,
259{
260    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
261        stream.weaken_retries()
262    }
263}
264
265impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
266where
267    L: Location<'a>,
268{
269    fn defer_tick(self) -> Self {
270        Stream::defer_tick(self)
271    }
272}
273
274impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
275    for Stream<T, Tick<L>, Bounded, O, R>
276where
277    L: Location<'a>,
278{
279    type Location = Tick<L>;
280
281    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
282        Stream::new(
283            location.clone(),
284            HydroNode::CycleSource {
285                cycle_id,
286                metadata: location.new_node_metadata(Self::collection_kind()),
287            },
288        )
289    }
290}
291
292impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
293    for Stream<T, Tick<L>, Bounded, O, R>
294where
295    L: Location<'a>,
296{
297    type Location = Tick<L>;
298
299    fn location(&self) -> &Self::Location {
300        self.location()
301    }
302
303    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
304        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
305            location.clone(),
306            HydroNode::DeferTick {
307                input: Box::new(HydroNode::CycleSource {
308                    cycle_id,
309                    metadata: location.new_node_metadata(Self::collection_kind()),
310                }),
311                metadata: location.new_node_metadata(Self::collection_kind()),
312            },
313        );
314
315        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
316    }
317}
318
319impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
320    for Stream<T, Tick<L>, Bounded, O, R>
321where
322    L: Location<'a>,
323{
324    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
325        assert_eq!(
326            Location::id(&self.location),
327            expected_location,
328            "locations do not match"
329        );
330        self.location
331            .flow_state()
332            .borrow_mut()
333            .push_root(HydroRoot::CycleSink {
334                cycle_id,
335                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
336                op_metadata: HydroIrOpMetadata::new(),
337            });
338    }
339}
340
341impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
342    for Stream<T, L, B, O, R>
343where
344    L: Location<'a>,
345{
346    type Location = L;
347
348    fn create_source(cycle_id: CycleId, location: L) -> Self {
349        Stream::new(
350            location.clone(),
351            HydroNode::CycleSource {
352                cycle_id,
353                metadata: location.new_node_metadata(Self::collection_kind()),
354            },
355        )
356    }
357}
358
359impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
360    for Stream<T, L, B, O, R>
361where
362    L: Location<'a>,
363{
364    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
365        assert_eq!(
366            Location::id(&self.location),
367            expected_location,
368            "locations do not match"
369        );
370        self.location
371            .flow_state()
372            .borrow_mut()
373            .push_root(HydroRoot::CycleSink {
374                cycle_id,
375                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
376                op_metadata: HydroIrOpMetadata::new(),
377            });
378    }
379}
380
381impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
382where
383    T: Clone,
384    L: Location<'a>,
385{
386    fn clone(&self) -> Self {
387        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
388            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
389            *self.ir_node.borrow_mut() = HydroNode::Tee {
390                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
391                metadata: self.location.new_node_metadata(Self::collection_kind()),
392            };
393        }
394
395        let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
396            unreachable!()
397        };
398        Stream {
399            location: self.location.clone(),
400            flow_state: self.flow_state.clone(),
401            ir_node: HydroNode::Tee {
402                inner: SharedNode(inner.0.clone()),
403                metadata: metadata.clone(),
404            }
405            .into(),
406            _phantom: PhantomData,
407        }
408    }
409}
410
411impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
412where
413    L: Location<'a>,
414{
415    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
416        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
417        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
418
419        let flow_state = location.flow_state().clone();
420        Stream {
421            location,
422            flow_state,
423            ir_node: RefCell::new(ir_node),
424            _phantom: PhantomData,
425        }
426    }
427
428    /// Returns the [`Location`] where this stream is being materialized.
429    pub fn location(&self) -> &L {
430        &self.location
431    }
432
433    /// Creates a shared reference handle to this stream's handoff buffer that can be captured
434    /// inside `q!()` closures. The handle resolves to `&Vec<T>` at runtime.
435    ///
436    /// The stream must be bounded, otherwise reading it would be non-deterministic.
437    pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
438    where
439        B: IsBounded,
440    {
441        crate::handoff_ref::StreamRef::new(&self.ir_node)
442    }
443
444    /// Returns a mutable reference handle to this stream's handoff buffer that can be captured
445    /// inside `q!()` closures. The handle resolves to `&mut Vec<T>` at runtime.
446    pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
447    where
448        B: IsBounded,
449    {
450        crate::handoff_ref::StreamMut::new(&self.ir_node)
451    }
452
453    /// Weakens the consistency of this live collection to not guarantee any consistency across
454    /// cluster members (if this collection is on a cluster).
455    pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
456    where
457        L: Location<'a>,
458    {
459        if L::consistency()
460            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
461        {
462            // already no consistency
463            Stream::new(
464                self.location.drop_consistency(),
465                self.ir_node.replace(HydroNode::Placeholder),
466            )
467        } else {
468            Stream::new(
469                self.location.drop_consistency(),
470                HydroNode::Cast {
471                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
472                    metadata: self.location.drop_consistency().new_node_metadata(Stream::<
473                        T,
474                        L::DropConsistency,
475                        B,
476                        O,
477                        R,
478                    >::collection_kind(
479                    )),
480                },
481            )
482        }
483    }
484
485    /// Casts this live collection to have the consistency guarantees specified in the given
486    /// location type parameter. The developer must ensure that the strengthened consistency
487    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
488    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
489        self,
490        _proof: impl crate::properties::ConsistencyProof,
491    ) -> Stream<T, L2, B, O, R>
492    where
493        L: Location<'a>,
494    {
495        if L::consistency() == L2::consistency() {
496            Stream::new(
497                self.location.with_consistency_of(),
498                self.ir_node.replace(HydroNode::Placeholder),
499            )
500        } else {
501            Stream::new(
502                self.location.with_consistency_of(),
503                HydroNode::AssertIsConsistent {
504                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505                    trusted: false,
506                    metadata: self
507                        .location
508                        .clone()
509                        .with_consistency_of::<L2>()
510                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
511                },
512            )
513        }
514    }
515
516    pub(crate) fn assert_has_consistency_of_trusted<
517        L2: Location<'a, DropConsistency = L::DropConsistency>,
518    >(
519        self,
520        _proof: impl crate::properties::ConsistencyProof,
521    ) -> Stream<T, L2, B, O, R>
522    where
523        L: Location<'a>,
524    {
525        if L::consistency() == L2::consistency() {
526            Stream::new(
527                self.location.with_consistency_of(),
528                self.ir_node.replace(HydroNode::Placeholder),
529            )
530        } else {
531            Stream::new(
532                self.location.with_consistency_of(),
533                HydroNode::AssertIsConsistent {
534                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
535                    trusted: true,
536                    metadata: self
537                        .location
538                        .clone()
539                        .with_consistency_of::<L2>()
540                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
541                },
542            )
543        }
544    }
545
546    pub(crate) fn collection_kind() -> CollectionKind {
547        CollectionKind::Stream {
548            bound: B::BOUND_KIND,
549            order: O::ORDERING_KIND,
550            retry: R::RETRIES_KIND,
551            element_type: quote_type::<T>().into(),
552        }
553    }
554
555    /// Produces a stream based on invoking `f` on each element.
556    /// If you do not want to modify the stream and instead only want to view
557    /// each item use [`Stream::inspect`] instead.
558    ///
559    /// # Example
560    /// ```rust
561    /// # #[cfg(feature = "deploy")] {
562    /// # use hydro_lang::prelude::*;
563    /// # use futures::StreamExt;
564    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
565    /// let words = process.source_iter(q!(vec!["hello", "world"]));
566    /// words.map(q!(|x| x.to_uppercase()))
567    /// # }, |mut stream| async move {
568    /// # for w in vec!["HELLO", "WORLD"] {
569    /// #     assert_eq!(stream.next().await.unwrap(), w);
570    /// # }
571    /// # }));
572    /// # }
573    /// ```
574    pub fn map<U, F, C, I, const WAS_MUT: bool>(
575        self,
576        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
577    ) -> Stream<U, L, B, O, R>
578    where
579        F: FnMut(T) -> U + 'a,
580        C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
581        I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
582    {
583        let f = crate::handoff_ref::with_ref_capture(|| {
584            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
585            proof.register_proof(&expr);
586            expr.into()
587        });
588        Stream::new(
589            self.location.clone(),
590            HydroNode::Map {
591                f,
592                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
593                metadata: self
594                    .location
595                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
596            },
597        )
598    }
599
600    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
601    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
602    /// for the output type `U` must produce items in a **deterministic** order.
603    ///
604    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
605    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
606    ///
607    /// # Example
608    /// ```rust
609    /// # #[cfg(feature = "deploy")] {
610    /// # use hydro_lang::prelude::*;
611    /// # use futures::StreamExt;
612    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613    /// process
614    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
615    ///     .flat_map_ordered(q!(|x| x))
616    /// # }, |mut stream| async move {
617    /// // 1, 2, 3, 4
618    /// # for w in (1..5) {
619    /// #     assert_eq!(stream.next().await.unwrap(), w);
620    /// # }
621    /// # }));
622    /// # }
623    /// ```
624    pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
625        self,
626        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
627    ) -> Stream<U, L, B, O, R>
628    where
629        I: IntoIterator<Item = U>,
630        F: FnMut(T) -> I + 'a,
631        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
632        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
633    {
634        let f = crate::handoff_ref::with_ref_capture(|| {
635            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
636            proof.register_proof(&expr);
637            expr.into()
638        });
639        Stream::new(
640            self.location.clone(),
641            HydroNode::FlatMap {
642                f,
643                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
644                metadata: self
645                    .location
646                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
647            },
648        )
649    }
650
651    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
652    /// for the output type `U` to produce items in any order.
653    ///
654    /// # Example
655    /// ```rust
656    /// # #[cfg(feature = "deploy")] {
657    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
658    /// # use futures::StreamExt;
659    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
660    /// process
661    ///     .source_iter(q!(vec![
662    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
663    ///         std::collections::HashSet::from_iter(vec![3, 4]),
664    ///     ]))
665    ///     .flat_map_unordered(q!(|x| x))
666    /// # }, |mut stream| async move {
667    /// // 1, 2, 3, 4, but in no particular order
668    /// # let mut results = Vec::new();
669    /// # for w in (1..5) {
670    /// #     results.push(stream.next().await.unwrap());
671    /// # }
672    /// # results.sort();
673    /// # assert_eq!(results, vec![1, 2, 3, 4]);
674    /// # }));
675    /// # }
676    /// ```
677    pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
678        self,
679        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
680    ) -> Stream<U, L, B, NoOrder, R>
681    where
682        I: IntoIterator<Item = U>,
683        F: FnMut(T) -> I + 'a,
684        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
685        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
686    {
687        let f = crate::handoff_ref::with_ref_capture(|| {
688            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
689            proof.register_proof(&expr);
690            expr.into()
691        });
692        Stream::new(
693            self.location.clone(),
694            HydroNode::FlatMap {
695                f,
696                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
697                metadata: self
698                    .location
699                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
700            },
701        )
702    }
703
704    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
705    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
706    ///
707    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
708    /// not deterministic, use [`Stream::flatten_unordered`] instead.
709    ///
710    /// ```rust
711    /// # #[cfg(feature = "deploy")] {
712    /// # use hydro_lang::prelude::*;
713    /// # use futures::StreamExt;
714    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
715    /// process
716    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
717    ///     .flatten_ordered()
718    /// # }, |mut stream| async move {
719    /// // 1, 2, 3, 4
720    /// # for w in (1..5) {
721    /// #     assert_eq!(stream.next().await.unwrap(), w);
722    /// # }
723    /// # }));
724    /// # }
725    /// ```
726    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
727    where
728        T: IntoIterator<Item = U>,
729    {
730        self.flat_map_ordered(q!(|d| d))
731    }
732
733    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
734    /// for the element type `T` to produce items in any order.
735    ///
736    /// # Example
737    /// ```rust
738    /// # #[cfg(feature = "deploy")] {
739    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
740    /// # use futures::StreamExt;
741    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
742    /// process
743    ///     .source_iter(q!(vec![
744    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
745    ///         std::collections::HashSet::from_iter(vec![3, 4]),
746    ///     ]))
747    ///     .flatten_unordered()
748    /// # }, |mut stream| async move {
749    /// // 1, 2, 3, 4, but in no particular order
750    /// # let mut results = Vec::new();
751    /// # for w in (1..5) {
752    /// #     results.push(stream.next().await.unwrap());
753    /// # }
754    /// # results.sort();
755    /// # assert_eq!(results, vec![1, 2, 3, 4]);
756    /// # }));
757    /// # }
758    /// ```
759    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
760    where
761        T: IntoIterator<Item = U>,
762    {
763        self.flat_map_unordered(q!(|d| d))
764    }
765
766    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
767    /// then emit the elements of that stream one by one. When the inner stream yields
768    /// `Pending`, this operator yields as well.
769    pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
770        self,
771        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
772    ) -> Stream<U, L, B, O, R>
773    where
774        S: futures::Stream<Item = U>,
775        F: FnMut(T) -> S + 'a,
776        C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
777        Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
778    {
779        let f = crate::handoff_ref::with_ref_capture(|| {
780            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
781            proof.register_proof(&expr);
782            expr.into()
783        });
784        Stream::new(
785            self.location.clone(),
786            HydroNode::FlatMapStreamBlocking {
787                f,
788                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
789                metadata: self
790                    .location
791                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
792            },
793        )
794    }
795
796    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
797    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
798    /// yields as well.
799    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
800    where
801        T: futures::Stream<Item = U>,
802    {
803        self.flat_map_stream_blocking(q!(|d| d))
804    }
805
806    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
807    /// `f`, preserving the order of the elements.
808    ///
809    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
810    /// not modify or take ownership of the values. If you need to modify the values while filtering
811    /// use [`Stream::filter_map`] instead.
812    ///
813    /// # Example
814    /// ```rust
815    /// # #[cfg(feature = "deploy")] {
816    /// # use hydro_lang::prelude::*;
817    /// # use futures::StreamExt;
818    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
819    /// process
820    ///     .source_iter(q!(vec![1, 2, 3, 4]))
821    ///     .filter(q!(|&x| x > 2))
822    /// # }, |mut stream| async move {
823    /// // 3, 4
824    /// # for w in (3..5) {
825    /// #     assert_eq!(stream.next().await.unwrap(), w);
826    /// # }
827    /// # }));
828    /// # }
829    /// ```
830    pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
831        self,
832        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
833    ) -> Self
834    where
835        F: FnMut(&T) -> bool + 'a,
836        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
837        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
838    {
839        let f = crate::handoff_ref::with_ref_capture(|| {
840            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
841            proof.register_proof(&expr);
842            expr.into()
843        });
844        Stream::new(
845            self.location.clone(),
846            HydroNode::Filter {
847                f,
848                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
849                metadata: self.location.new_node_metadata(Self::collection_kind()),
850            },
851        )
852    }
853
854    /// Splits the stream into two streams based on a predicate, without cloning elements.
855    ///
856    /// Elements for which `f` returns `true` are sent to the first output stream,
857    /// and elements for which `f` returns `false` are sent to the second output stream.
858    ///
859    /// Unlike using `filter` twice, this only evaluates the predicate once per element
860    /// and does not require `T: Clone`.
861    ///
862    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
863    /// the predicate is only used for routing; the element itself is moved to the
864    /// appropriate output stream.
865    ///
866    /// # Example
867    /// ```rust
868    /// # #[cfg(feature = "deploy")] {
869    /// # use hydro_lang::prelude::*;
870    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
871    /// # use futures::StreamExt;
872    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
873    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
874    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
875    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
876    /// evens.map(q!(|x| (x, true)))
877    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
878    /// # }, |mut stream| async move {
879    /// # let mut results = Vec::new();
880    /// # for _ in 0..6 {
881    /// #     results.push(stream.next().await.unwrap());
882    /// # }
883    /// # results.sort();
884    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
885    /// # }));
886    /// # }
887    /// ```
888    pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
889        self,
890        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
891    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
892    where
893        F: FnMut(&T) -> bool + 'a,
894        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
895        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
896    {
897        let f = crate::handoff_ref::with_ref_capture(|| {
898            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
899            proof.register_proof(&expr);
900            expr.into()
901        });
902        let shared = SharedNode(Rc::new(RefCell::new(
903            self.ir_node.replace(HydroNode::Placeholder),
904        )));
905
906        let true_stream = Stream::new(
907            self.location.clone(),
908            HydroNode::Partition {
909                inner: SharedNode(shared.0.clone()),
910                f: f.clone(),
911                is_true: true,
912                metadata: self.location.new_node_metadata(Self::collection_kind()),
913            },
914        );
915
916        let false_stream = Stream::new(
917            self.location.clone(),
918            HydroNode::Partition {
919                inner: SharedNode(shared.0),
920                f,
921                is_true: false,
922                metadata: self.location.new_node_metadata(Self::collection_kind()),
923            },
924        );
925
926        (true_stream, false_stream)
927    }
928
929    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
930    ///
931    /// # Example
932    /// ```rust
933    /// # #[cfg(feature = "deploy")] {
934    /// # use hydro_lang::prelude::*;
935    /// # use futures::StreamExt;
936    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
937    /// process
938    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
939    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
940    /// # }, |mut stream| async move {
941    /// // 1, 2
942    /// # for w in (1..3) {
943    /// #     assert_eq!(stream.next().await.unwrap(), w);
944    /// # }
945    /// # }));
946    /// # }
947    /// ```
948    pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
949        self,
950        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
951    ) -> Stream<U, L, B, O, R>
952    where
953        F: FnMut(T) -> Option<U> + 'a,
954        C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
955        Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
956    {
957        let f = crate::handoff_ref::with_ref_capture(|| {
958            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
959            proof.register_proof(&expr);
960            expr.into()
961        });
962        Stream::new(
963            self.location.clone(),
964            HydroNode::FilterMap {
965                f,
966                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
967                metadata: self
968                    .location
969                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
970            },
971        )
972    }
973
974    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
975    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
976    /// If `other` is an empty [`Optional`], no values will be produced.
977    ///
978    /// # Example
979    /// ```rust
980    /// # #[cfg(feature = "deploy")] {
981    /// # use hydro_lang::prelude::*;
982    /// # use futures::StreamExt;
983    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
984    /// let tick = process.tick();
985    /// let batch = process
986    ///   .source_iter(q!(vec![1, 2, 3, 4]))
987    ///   .batch(&tick, nondet!(/** test */));
988    /// let count = batch.clone().count(); // `count()` returns a singleton
989    /// batch.cross_singleton(count).all_ticks()
990    /// # }, |mut stream| async move {
991    /// // (1, 4), (2, 4), (3, 4), (4, 4)
992    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
993    /// #     assert_eq!(stream.next().await.unwrap(), w);
994    /// # }
995    /// # }));
996    /// # }
997    /// ```
998    pub fn cross_singleton<O2>(
999        self,
1000        other: impl Into<Optional<O2, L, Bounded>>,
1001    ) -> Stream<(T, O2), L, B, O, R>
1002    where
1003        O2: Clone,
1004    {
1005        let other: Optional<O2, L, Bounded> = other.into();
1006        check_matching_location(&self.location, &other.location);
1007
1008        Stream::new(
1009            self.location.clone(),
1010            HydroNode::CrossSingleton {
1011                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1012                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1013                metadata: self
1014                    .location
1015                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1016            },
1017        )
1018    }
1019
1020    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
1021    ///
1022    /// # Example
1023    /// ```rust
1024    /// # #[cfg(feature = "deploy")] {
1025    /// # use hydro_lang::prelude::*;
1026    /// # use futures::StreamExt;
1027    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1028    /// let tick = process.tick();
1029    /// // ticks are lazy by default, forces the second tick to run
1030    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1031    ///
1032    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
1033    /// let batch_first_tick = process
1034    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1035    ///   .batch(&tick, nondet!(/** test */));
1036    /// let batch_second_tick = process
1037    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1038    ///   .batch(&tick, nondet!(/** test */))
1039    ///   .defer_tick();
1040    /// batch_first_tick.chain(batch_second_tick)
1041    ///   .filter_if(signal)
1042    ///   .all_ticks()
1043    /// # }, |mut stream| async move {
1044    /// // [1, 2, 3, 4]
1045    /// # for w in vec![1, 2, 3, 4] {
1046    /// #     assert_eq!(stream.next().await.unwrap(), w);
1047    /// # }
1048    /// # }));
1049    /// # }
1050    /// ```
1051    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1052        self.cross_singleton(signal.filter(q!(|b| *b)))
1053            .map(q!(|(d, _)| d))
1054    }
1055
1056    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
1057    ///
1058    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
1059    /// leader of a cluster.
1060    ///
1061    /// # Example
1062    /// ```rust
1063    /// # #[cfg(feature = "deploy")] {
1064    /// # use hydro_lang::prelude::*;
1065    /// # use futures::StreamExt;
1066    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1067    /// let tick = process.tick();
1068    /// // ticks are lazy by default, forces the second tick to run
1069    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1070    ///
1071    /// let batch_first_tick = process
1072    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1073    ///   .batch(&tick, nondet!(/** test */));
1074    /// let batch_second_tick = process
1075    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1076    ///   .batch(&tick, nondet!(/** test */))
1077    ///   .defer_tick(); // appears on the second tick
1078    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1079    /// batch_first_tick.chain(batch_second_tick)
1080    ///   .filter_if_some(some_on_first_tick)
1081    ///   .all_ticks()
1082    /// # }, |mut stream| async move {
1083    /// // [1, 2, 3, 4]
1084    /// # for w in vec![1, 2, 3, 4] {
1085    /// #     assert_eq!(stream.next().await.unwrap(), w);
1086    /// # }
1087    /// # }));
1088    /// # }
1089    /// ```
1090    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1091    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1092        self.filter_if(signal.is_some())
1093    }
1094
1095    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1096    ///
1097    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1098    /// some local state.
1099    ///
1100    /// # Example
1101    /// ```rust
1102    /// # #[cfg(feature = "deploy")] {
1103    /// # use hydro_lang::prelude::*;
1104    /// # use futures::StreamExt;
1105    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1106    /// let tick = process.tick();
1107    /// // ticks are lazy by default, forces the second tick to run
1108    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1109    ///
1110    /// let batch_first_tick = process
1111    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1112    ///   .batch(&tick, nondet!(/** test */));
1113    /// let batch_second_tick = process
1114    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1115    ///   .batch(&tick, nondet!(/** test */))
1116    ///   .defer_tick(); // appears on the second tick
1117    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1118    /// batch_first_tick.chain(batch_second_tick)
1119    ///   .filter_if_none(some_on_first_tick)
1120    ///   .all_ticks()
1121    /// # }, |mut stream| async move {
1122    /// // [5, 6, 7, 8]
1123    /// # for w in vec![5, 6, 7, 8] {
1124    /// #     assert_eq!(stream.next().await.unwrap(), w);
1125    /// # }
1126    /// # }));
1127    /// # }
1128    /// ```
1129    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1130    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1131        self.filter_if(other.is_none())
1132    }
1133
1134    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1135    /// returning all tupled pairs.
1136    ///
1137    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1138    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1139    /// symmetric hash join is used and ordering is [`NoOrder`].
1140    ///
1141    /// # Example
1142    /// ```rust
1143    /// # #[cfg(feature = "deploy")] {
1144    /// # use hydro_lang::prelude::*;
1145    /// # use std::collections::HashSet;
1146    /// # use futures::StreamExt;
1147    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148    /// let tick = process.tick();
1149    /// let stream1 = process.source_iter(q!(vec![1, 2]));
1150    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1151    /// stream1.cross_product(stream2)
1152    /// # }, |mut stream| async move {
1153    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1154    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1155    /// # stream.map(|i| assert!(expected.contains(&i)));
1156    /// # }));
1157    /// # }
1158    pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1159        self,
1160        other: Stream<T2, L, B2, O2, R2>,
1161    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1162    where
1163        T: Clone,
1164        T2: Clone,
1165        R: MinRetries<R2>,
1166    {
1167        self.map(q!(|v| ((), v)))
1168            .join(other.map(q!(|v| ((), v))))
1169            .map(q!(|((), (v1, v2))| (v1, v2)))
1170    }
1171
1172    /// Takes one stream as input and filters out any duplicate occurrences. The output
1173    /// contains all unique values from the input.
1174    ///
1175    /// # Example
1176    /// ```rust
1177    /// # #[cfg(feature = "deploy")] {
1178    /// # use hydro_lang::prelude::*;
1179    /// # use futures::StreamExt;
1180    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1181    /// let tick = process.tick();
1182    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1183    /// # }, |mut stream| async move {
1184    /// # for w in vec![1, 2, 3, 4] {
1185    /// #     assert_eq!(stream.next().await.unwrap(), w);
1186    /// # }
1187    /// # }));
1188    /// # }
1189    /// ```
1190    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1191    where
1192        T: Eq + Hash,
1193    {
1194        Stream::new(
1195            self.location.clone(),
1196            HydroNode::Unique {
1197                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1198                metadata: self
1199                    .location
1200                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1201            },
1202        )
1203    }
1204
1205    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1206    ///
1207    /// The `other` stream must be [`Bounded`], since this function will wait until
1208    /// all its elements are available before producing any output.
1209    /// # Example
1210    /// ```rust
1211    /// # #[cfg(feature = "deploy")] {
1212    /// # use hydro_lang::prelude::*;
1213    /// # use futures::StreamExt;
1214    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1215    /// let tick = process.tick();
1216    /// let stream = process
1217    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1218    ///   .batch(&tick, nondet!(/** test */));
1219    /// let batch = process
1220    ///   .source_iter(q!(vec![1, 2]))
1221    ///   .batch(&tick, nondet!(/** test */));
1222    /// stream.filter_not_in(batch).all_ticks()
1223    /// # }, |mut stream| async move {
1224    /// # for w in vec![3, 4] {
1225    /// #     assert_eq!(stream.next().await.unwrap(), w);
1226    /// # }
1227    /// # }));
1228    /// # }
1229    /// ```
1230    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1231    where
1232        T: Eq + Hash,
1233        B2: IsBounded,
1234    {
1235        check_matching_location(&self.location, &other.location);
1236
1237        Stream::new(
1238            self.location.clone(),
1239            HydroNode::Difference {
1240                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1241                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1242                metadata: self
1243                    .location
1244                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1245            },
1246        )
1247    }
1248
1249    /// An operator which allows you to "inspect" each element of a stream without
1250    /// modifying it. The closure `f` is called on a reference to each item. This is
1251    /// mainly useful for debugging, and should not be used to generate side-effects.
1252    ///
1253    /// # Example
1254    /// ```rust
1255    /// # #[cfg(feature = "deploy")] {
1256    /// # use hydro_lang::prelude::*;
1257    /// # use futures::StreamExt;
1258    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1259    /// let nums = process.source_iter(q!(vec![1, 2]));
1260    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1261    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1262    /// # }, |mut stream| async move {
1263    /// # for w in vec![1, 2] {
1264    /// #     assert_eq!(stream.next().await.unwrap(), w);
1265    /// # }
1266    /// # }));
1267    /// # }
1268    /// ```
1269    pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1270        self,
1271        f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1272    ) -> Self
1273    where
1274        F: FnMut(&T) + 'a,
1275        C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1276        Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1277    {
1278        let f = crate::handoff_ref::with_ref_capture(|| {
1279            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1280            proof.register_proof(&expr);
1281            expr.into()
1282        });
1283
1284        Stream::new(
1285            self.location.clone(),
1286            HydroNode::Inspect {
1287                f,
1288                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1289                metadata: self.location.new_node_metadata(Self::collection_kind()),
1290            },
1291        )
1292    }
1293
1294    /// Executes the provided closure for every element in this stream.
1295    ///
1296    /// Because the closure may have side effects, the stream must have deterministic order
1297    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1298    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1299    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1300    ///
1301    /// The closure may capture singletons via `by_ref()` or `by_mut()`. No commutativity
1302    /// or idempotence proofs are needed because the `TotalOrder + ExactlyOnce` requirements
1303    /// already guarantee deterministic execution.
1304    pub fn for_each<F: FnMut(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1305    where
1306        O: IsOrdered,
1307        R: IsExactlyOnce,
1308    {
1309        let f = crate::handoff_ref::with_ref_capture(|| f.splice_fnmut1_ctx(&self.location).into());
1310        self.location
1311            .flow_state()
1312            .borrow_mut()
1313            .push_root(HydroRoot::ForEach {
1314                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1315                f,
1316                op_metadata: HydroIrOpMetadata::new(),
1317            });
1318    }
1319
1320    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1321    /// TCP socket to some other server. You should _not_ use this API for interacting with
1322    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1323    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1324    /// interaction with asynchronous sinks.
1325    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1326    where
1327        O: IsOrdered,
1328        R: IsExactlyOnce,
1329        S: 'a + futures::Sink<T> + Unpin,
1330    {
1331        self.location
1332            .flow_state()
1333            .borrow_mut()
1334            .push_root(HydroRoot::DestSink {
1335                sink: sink.splice_typed_ctx(&self.location).into(),
1336                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1337                op_metadata: HydroIrOpMetadata::new(),
1338            });
1339    }
1340
1341    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1342    ///
1343    /// # Example
1344    /// ```rust
1345    /// # #[cfg(feature = "deploy")] {
1346    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1347    /// # use futures::StreamExt;
1348    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1349    /// let tick = process.tick();
1350    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1351    /// numbers.enumerate()
1352    /// # }, |mut stream| async move {
1353    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1354    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1355    /// #     assert_eq!(stream.next().await.unwrap(), w);
1356    /// # }
1357    /// # }));
1358    /// # }
1359    /// ```
1360    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1361    where
1362        O: IsOrdered,
1363        R: IsExactlyOnce,
1364    {
1365        Stream::new(
1366            self.location.clone(),
1367            HydroNode::Enumerate {
1368                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1369                metadata: self.location.new_node_metadata(Stream::<
1370                    (usize, T),
1371                    L,
1372                    B,
1373                    TotalOrder,
1374                    ExactlyOnce,
1375                >::collection_kind()),
1376            },
1377        )
1378    }
1379
1380    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1381    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1382    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1383    ///
1384    /// Depending on the input stream guarantees, the closure may need to be commutative
1385    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1386    ///
1387    /// # Example
1388    /// ```rust
1389    /// # #[cfg(feature = "deploy")] {
1390    /// # use hydro_lang::prelude::*;
1391    /// # use futures::StreamExt;
1392    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1393    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1394    /// words
1395    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1396    ///     .into_stream()
1397    /// # }, |mut stream| async move {
1398    /// // "HELLOWORLD"
1399    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1400    /// # }));
1401    /// # }
1402    /// ```
1403    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1404        self,
1405        init: impl IntoQuotedMut<'a, I, L>,
1406        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1407    ) -> Singleton<A, L, B2>
1408    where
1409        I: Fn() -> A + 'a,
1410        F: 'a + Fn(&mut A, T),
1411        C: ValidCommutativityFor<O>,
1412        Idemp: ValidIdempotenceFor<R>,
1413        B: ApplyMonotoneStream<M, B2>,
1414    {
1415        let init = init.splice_fn0_ctx(&self.location).into();
1416        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1417        proof.register_proof(&comb);
1418
1419        // Only assume_retries (for idempotence), not assume_ordering.
1420        // The fold hook in the simulator handles ordering non-determinism directly.
1421        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1422        let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1423
1424        let core = HydroNode::Fold {
1425            init,
1426            acc: comb.into(),
1427            input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1428            metadata: retried
1429                .location
1430                .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1431            // we do not guarantee consistency at this point because if the algebraic properties
1432            // do not hold in practice, replica consistency may fail to be maintained, so we
1433            // would like the simulator to assert consistency; in the future, this will be dynamic
1434            // based on the proof mechanism
1435        };
1436
1437        Singleton::new(retried.location.clone(), core)
1438            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1439    }
1440
1441    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1442    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1443    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1444    /// reference, so that it can be modified in place.
1445    ///
1446    /// Depending on the input stream guarantees, the closure may need to be commutative
1447    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1448    ///
1449    /// # Example
1450    /// ```rust
1451    /// # #[cfg(feature = "deploy")] {
1452    /// # use hydro_lang::prelude::*;
1453    /// # use futures::StreamExt;
1454    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1455    /// let bools = process.source_iter(q!(vec![false, true, false]));
1456    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1457    /// # }, |mut stream| async move {
1458    /// // true
1459    /// # assert_eq!(stream.next().await.unwrap(), true);
1460    /// # }));
1461    /// # }
1462    /// ```
1463    pub fn reduce<F, C, Idemp>(
1464        self,
1465        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1466    ) -> Optional<T, L, B>
1467    where
1468        F: Fn(&mut T, T) + 'a,
1469        C: ValidCommutativityFor<O>,
1470        Idemp: ValidIdempotenceFor<R>,
1471    {
1472        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1473        proof.register_proof(&f);
1474
1475        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1476        let ordered_etc: Stream<T, L::DropConsistency, B> =
1477            self.assume_retries(nondet).assume_ordering(nondet);
1478
1479        let core = HydroNode::Reduce {
1480            f: f.into(),
1481            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1482            metadata: ordered_etc
1483                .location
1484                .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1485        };
1486
1487        Optional::new(ordered_etc.location.clone(), core)
1488            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1489    }
1490
1491    /// Computes the maximum element in the stream as an [`Optional`], which
1492    /// will be empty until the first element in the input arrives.
1493    ///
1494    /// # Example
1495    /// ```rust
1496    /// # #[cfg(feature = "deploy")] {
1497    /// # use hydro_lang::prelude::*;
1498    /// # use futures::StreamExt;
1499    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1500    /// let tick = process.tick();
1501    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1502    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1503    /// batch.max().all_ticks()
1504    /// # }, |mut stream| async move {
1505    /// // 4
1506    /// # assert_eq!(stream.next().await.unwrap(), 4);
1507    /// # }));
1508    /// # }
1509    /// ```
1510    pub fn max(self) -> Optional<T, L, B>
1511    where
1512        T: Ord,
1513    {
1514        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1515            .assume_ordering_trusted_bounded::<TotalOrder>(
1516                nondet!(/** max is commutative, but order affects intermediates */),
1517            )
1518            .reduce(q!(|curr, new| {
1519                if new > *curr {
1520                    *curr = new;
1521                }
1522            }))
1523    }
1524
1525    /// Computes the minimum element in the stream as an [`Optional`], which
1526    /// will be empty until the first element in the input arrives.
1527    ///
1528    /// # Example
1529    /// ```rust
1530    /// # #[cfg(feature = "deploy")] {
1531    /// # use hydro_lang::prelude::*;
1532    /// # use futures::StreamExt;
1533    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1534    /// let tick = process.tick();
1535    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1536    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1537    /// batch.min().all_ticks()
1538    /// # }, |mut stream| async move {
1539    /// // 1
1540    /// # assert_eq!(stream.next().await.unwrap(), 1);
1541    /// # }));
1542    /// # }
1543    /// ```
1544    pub fn min(self) -> Optional<T, L, B>
1545    where
1546        T: Ord,
1547    {
1548        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1549            .assume_ordering_trusted_bounded::<TotalOrder>(
1550                nondet!(/** max is commutative, but order affects intermediates */),
1551            )
1552            .reduce(q!(|curr, new| {
1553                if new < *curr {
1554                    *curr = new;
1555                }
1556            }))
1557    }
1558
1559    /// Computes the first element in the stream as an [`Optional`], which
1560    /// will be empty until the first element in the input arrives.
1561    ///
1562    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1563    /// re-ordering of elements may cause the first element to change.
1564    ///
1565    /// # Example
1566    /// ```rust
1567    /// # #[cfg(feature = "deploy")] {
1568    /// # use hydro_lang::prelude::*;
1569    /// # use futures::StreamExt;
1570    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1571    /// let tick = process.tick();
1572    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1573    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1574    /// batch.first().all_ticks()
1575    /// # }, |mut stream| async move {
1576    /// // 1
1577    /// # assert_eq!(stream.next().await.unwrap(), 1);
1578    /// # }));
1579    /// # }
1580    /// ```
1581    pub fn first(self) -> Optional<T, L, B>
1582    where
1583        O: IsOrdered,
1584    {
1585        self.make_totally_ordered()
1586            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1587            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1588            .reduce(q!(|_, _| {}))
1589    }
1590
1591    /// Computes the last element in the stream as an [`Optional`], which
1592    /// will be empty until an element in the input arrives.
1593    ///
1594    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1595    /// re-ordering of elements may cause the last element to change.
1596    ///
1597    /// # Example
1598    /// ```rust
1599    /// # #[cfg(feature = "deploy")] {
1600    /// # use hydro_lang::prelude::*;
1601    /// # use futures::StreamExt;
1602    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1603    /// let tick = process.tick();
1604    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1605    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1606    /// batch.last().all_ticks()
1607    /// # }, |mut stream| async move {
1608    /// // 4
1609    /// # assert_eq!(stream.next().await.unwrap(), 4);
1610    /// # }));
1611    /// # }
1612    /// ```
1613    pub fn last(self) -> Optional<T, L, B>
1614    where
1615        O: IsOrdered,
1616    {
1617        self.make_totally_ordered()
1618            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1619            .reduce(q!(|curr, new| *curr = new))
1620    }
1621
1622    /// Returns a stream containing at most the first `n` elements of the input stream,
1623    /// preserving the original order. Similar to `LIMIT` in SQL.
1624    ///
1625    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1626    /// retries, since the result depends on the order and cardinality of elements.
1627    ///
1628    /// # Example
1629    /// ```rust
1630    /// # #[cfg(feature = "deploy")] {
1631    /// # use hydro_lang::prelude::*;
1632    /// # use futures::StreamExt;
1633    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1634    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1635    /// numbers.limit(q!(3))
1636    /// # }, |mut stream| async move {
1637    /// // 10, 20, 30
1638    /// # for w in vec![10, 20, 30] {
1639    /// #     assert_eq!(stream.next().await.unwrap(), w);
1640    /// # }
1641    /// # }));
1642    /// # }
1643    /// ```
1644    pub fn limit(
1645        self,
1646        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1647    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1648    where
1649        O: IsOrdered,
1650        R: IsExactlyOnce,
1651    {
1652        self.generator(
1653            q!(|| 0usize),
1654            q!(move |count, item| {
1655                if *count == n {
1656                    Generate::Break
1657                } else {
1658                    *count += 1;
1659                    if *count == n {
1660                        Generate::Return(item)
1661                    } else {
1662                        Generate::Yield(item)
1663                    }
1664                }
1665            }),
1666        )
1667    }
1668
1669    /// Collects all the elements of this stream into a single [`Vec`] element.
1670    ///
1671    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1672    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1673    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1674    /// the vector at an arbitrary point in time.
1675    ///
1676    /// # Example
1677    /// ```rust
1678    /// # #[cfg(feature = "deploy")] {
1679    /// # use hydro_lang::prelude::*;
1680    /// # use futures::StreamExt;
1681    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1682    /// let tick = process.tick();
1683    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1684    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1685    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1686    /// # }, |mut stream| async move {
1687    /// // [ vec![1, 2, 3, 4] ]
1688    /// # for w in vec![vec![1, 2, 3, 4]] {
1689    /// #     assert_eq!(stream.next().await.unwrap(), w);
1690    /// # }
1691    /// # }));
1692    /// # }
1693    /// ```
1694    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1695    where
1696        O: IsOrdered,
1697        R: IsExactlyOnce,
1698    {
1699        self.make_totally_ordered().make_exactly_once().fold(
1700            q!(|| vec![]),
1701            q!(|acc, v| {
1702                acc.push(v);
1703            }),
1704        )
1705    }
1706
1707    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1708    /// and emitting each intermediate result.
1709    ///
1710    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1711    /// containing all intermediate accumulated values. The scan operation can also terminate early
1712    /// by returning `None`.
1713    ///
1714    /// The function takes a mutable reference to the accumulator and the current element, and returns
1715    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1716    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1717    ///
1718    /// # Examples
1719    ///
1720    /// Basic usage - running sum:
1721    /// ```rust
1722    /// # #[cfg(feature = "deploy")] {
1723    /// # use hydro_lang::prelude::*;
1724    /// # use futures::StreamExt;
1725    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1726    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1727    ///     q!(|| 0),
1728    ///     q!(|acc, x| {
1729    ///         *acc += x;
1730    ///         Some(*acc)
1731    ///     }),
1732    /// )
1733    /// # }, |mut stream| async move {
1734    /// // Output: 1, 3, 6, 10
1735    /// # for w in vec![1, 3, 6, 10] {
1736    /// #     assert_eq!(stream.next().await.unwrap(), w);
1737    /// # }
1738    /// # }));
1739    /// # }
1740    /// ```
1741    ///
1742    /// Early termination example:
1743    /// ```rust
1744    /// # #[cfg(feature = "deploy")] {
1745    /// # use hydro_lang::prelude::*;
1746    /// # use futures::StreamExt;
1747    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1748    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1749    ///     q!(|| 1),
1750    ///     q!(|state, x| {
1751    ///         *state = *state * x;
1752    ///         if *state > 6 {
1753    ///             None // Terminate the stream
1754    ///         } else {
1755    ///             Some(-*state)
1756    ///         }
1757    ///     }),
1758    /// )
1759    /// # }, |mut stream| async move {
1760    /// // Output: -1, -2, -6
1761    /// # for w in vec![-1, -2, -6] {
1762    /// #     assert_eq!(stream.next().await.unwrap(), w);
1763    /// # }
1764    /// # }));
1765    /// # }
1766    /// ```
1767    pub fn scan<A, U, I, F>(
1768        self,
1769        init: impl IntoQuotedMut<'a, I, L>,
1770        f: impl IntoQuotedMut<'a, F, L>,
1771    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1772    where
1773        O: IsOrdered,
1774        R: IsExactlyOnce,
1775        I: Fn() -> A + 'a,
1776        F: Fn(&mut A, T) -> Option<U> + 'a,
1777    {
1778        let init = init.splice_fn0_ctx(&self.location).into();
1779        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1780
1781        Stream::new(
1782            self.location.clone(),
1783            HydroNode::Scan {
1784                init,
1785                acc: f,
1786                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1787                metadata: self.location.new_node_metadata(
1788                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1789                ),
1790            },
1791        )
1792    }
1793
1794    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1795    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1796    /// by the function.
1797    ///
1798    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1799    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1800    /// emitted. If it resolves to `None`, the item is filtered out.
1801    ///
1802    /// # Examples
1803    ///
1804    /// ```rust
1805    /// # #[cfg(feature = "deploy")] {
1806    /// # use hydro_lang::prelude::*;
1807    /// # use futures::StreamExt;
1808    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1809    /// process
1810    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1811    ///     .scan_async_blocking(
1812    ///         q!(|| 0),
1813    ///         q!(|acc, x| {
1814    ///             *acc += x;
1815    ///             let val = *acc;
1816    ///             async move { Some(val) }
1817    ///         }),
1818    ///     )
1819    /// # }, |mut stream| async move {
1820    /// // Output: 1, 3, 6, 10
1821    /// # for w in vec![1, 3, 6, 10] {
1822    /// #     assert_eq!(stream.next().await.unwrap(), w);
1823    /// # }
1824    /// # }));
1825    /// # }
1826    /// ```
1827    pub fn scan_async_blocking<A, U, I, F, Fut>(
1828        self,
1829        init: impl IntoQuotedMut<'a, I, L>,
1830        f: impl IntoQuotedMut<'a, F, L>,
1831    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1832    where
1833        O: IsOrdered,
1834        R: IsExactlyOnce,
1835        I: Fn() -> A + 'a,
1836        F: Fn(&mut A, T) -> Fut + 'a,
1837        Fut: Future<Output = Option<U>> + 'a,
1838    {
1839        let init = init.splice_fn0_ctx(&self.location).into();
1840        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1841
1842        Stream::new(
1843            self.location.clone(),
1844            HydroNode::ScanAsyncBlocking {
1845                init,
1846                acc: f,
1847                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1848                metadata: self.location.new_node_metadata(
1849                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1850                ),
1851            },
1852        )
1853    }
1854
1855    /// Iteratively processes the elements of the stream using a state machine that can yield
1856    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1857    /// syntax in Rust, without requiring special syntax.
1858    ///
1859    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1860    /// state. The second argument defines the processing logic, taking in a mutable reference
1861    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1862    /// variants define what is emitted and whether further inputs should be processed.
1863    ///
1864    /// # Example
1865    /// ```rust
1866    /// # #[cfg(feature = "deploy")] {
1867    /// # use hydro_lang::prelude::*;
1868    /// # use futures::StreamExt;
1869    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1870    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1871    ///     q!(|| 0),
1872    ///     q!(|acc, x| {
1873    ///         *acc += x;
1874    ///         if *acc > 100 {
1875    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1876    ///         } else if *acc % 2 == 0 {
1877    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1878    ///         } else {
1879    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1880    ///         }
1881    ///     }),
1882    /// )
1883    /// # }, |mut stream| async move {
1884    /// // Output: "even", "done!"
1885    /// # let mut results = Vec::new();
1886    /// # for _ in 0..2 {
1887    /// #     results.push(stream.next().await.unwrap());
1888    /// # }
1889    /// # results.sort();
1890    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1891    /// # }));
1892    /// # }
1893    /// ```
1894    pub fn generator<A, U, I, F>(
1895        self,
1896        init: impl IntoQuotedMut<'a, I, L> + Copy,
1897        f: impl IntoQuotedMut<'a, F, L> + Copy,
1898    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1899    where
1900        O: IsOrdered,
1901        R: IsExactlyOnce,
1902        I: Fn() -> A + 'a,
1903        F: Fn(&mut A, T) -> Generate<U> + 'a,
1904    {
1905        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1906        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1907
1908        let this = self.make_totally_ordered().make_exactly_once();
1909
1910        // State is Option<Option<A>>:
1911        //   None = not yet initialized
1912        //   Some(Some(a)) = active with state a
1913        //   Some(None) = terminated
1914        let scan_init = q!(|| None)
1915            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1916            .into();
1917        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1918            if state.is_none() {
1919                *state = Some(Some(init()));
1920            }
1921            match state {
1922                Some(Some(state_value)) => match f(state_value, v) {
1923                    Generate::Yield(out) => Some(Some(out)),
1924                    Generate::Return(out) => {
1925                        *state = Some(None);
1926                        Some(Some(out))
1927                    }
1928                    // Unlike KeyedStream, we can terminate the scan directly on
1929                    // Break/Return because there is only one state (no other keys
1930                    // that still need processing).
1931                    Generate::Break => None,
1932                    Generate::Continue => Some(None),
1933                },
1934                // State is Some(None) after Return; terminate the scan.
1935                _ => None,
1936            }
1937        })
1938        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1939        .into();
1940
1941        let scan_node = HydroNode::Scan {
1942            init: scan_init,
1943            acc: scan_f,
1944            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1945            metadata: this.location.new_node_metadata(Stream::<
1946                Option<U>,
1947                L,
1948                B,
1949                TotalOrder,
1950                ExactlyOnce,
1951            >::collection_kind()),
1952        };
1953
1954        let flatten_f = q!(|d| d)
1955            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1956            .into();
1957        let flatten_node = HydroNode::FlatMap {
1958            f: flatten_f,
1959            input: Box::new(scan_node),
1960            metadata: this
1961                .location
1962                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1963        };
1964
1965        Stream::new(this.location.clone(), flatten_node)
1966    }
1967
1968    /// Given a time interval, returns a stream corresponding to samples taken from the
1969    /// stream roughly at that interval. The output will have elements in the same order
1970    /// as the input, but with arbitrary elements skipped between samples. There is also
1971    /// no guarantee on the exact timing of the samples.
1972    ///
1973    /// # Non-Determinism
1974    /// The output stream is non-deterministic in which elements are sampled, since this
1975    /// is controlled by a clock.
1976    pub fn sample_every(
1977        self,
1978        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1979        nondet: NonDet,
1980    ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1981    where
1982        L: TopLevel<'a>,
1983    {
1984        let samples = self.location.source_interval(interval);
1985
1986        let tick = self.location.tick();
1987        self.batch(&tick, nondet)
1988            .filter_if(samples.batch(&tick, nondet).first().is_some())
1989            .all_ticks()
1990            .weaken_retries()
1991    }
1992
1993    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1994    /// stream has not emitted a value since that duration.
1995    ///
1996    /// # Non-Determinism
1997    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1998    /// samples take place, timeouts may be non-deterministically generated or missed,
1999    /// and the notification of the timeout may be delayed as well. There is also no
2000    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2001    /// detected based on when the next sample is taken.
2002    pub fn timeout(
2003        self,
2004        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2005        nondet: NonDet,
2006    ) -> Optional<(), L::DropConsistency, Unbounded>
2007    where
2008        L: TopLevel<'a>,
2009    {
2010        let tick = self.location.tick();
2011
2012        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2013            q!(|| None),
2014            q!(
2015                |latest, _| {
2016                    *latest = Some(Instant::now());
2017                },
2018                commutative = manual_proof!(/** TODO */)
2019            ),
2020        );
2021
2022        latest_received
2023            .snapshot(&tick, nondet)
2024            .filter_map(q!(move |latest_received| {
2025                if let Some(latest_received) = latest_received {
2026                    if Instant::now().duration_since(latest_received) > duration {
2027                        Some(())
2028                    } else {
2029                        None
2030                    }
2031                } else {
2032                    Some(())
2033                }
2034            }))
2035            .latest()
2036    }
2037
2038    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2039    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2040    ///
2041    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2042    /// processed before an acknowledgement is emitted.
2043    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2044        let id = self.location.flow_state().borrow_mut().next_clock_id();
2045        let out_location = Atomic {
2046            tick: Tick {
2047                id,
2048                l: self.location.clone(),
2049            },
2050        };
2051        Stream::new(
2052            out_location.clone(),
2053            HydroNode::BeginAtomic {
2054                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2055                metadata: out_location
2056                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2057            },
2058        )
2059    }
2060
2061    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2062    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2063    /// the order of the input. The output stream will execute in the [`Tick`] that was
2064    /// used to create the atomic section.
2065    ///
2066    /// # Non-Determinism
2067    /// The batch boundaries are non-deterministic and may change across executions.
2068    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2069        self,
2070        tick: &Tick<L2>,
2071        _nondet: NonDet,
2072    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2073        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2074        Stream::new(
2075            tick.drop_consistency(),
2076            HydroNode::Batch {
2077                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2078                metadata: tick
2079                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2080            },
2081        )
2082    }
2083
2084    /// An operator which allows you to "name" a `HydroNode`.
2085    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2086    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2087        {
2088            let mut node = self.ir_node.borrow_mut();
2089            let metadata = node.metadata_mut();
2090            metadata.tag = Some(name.to_owned());
2091        }
2092        self
2093    }
2094
2095    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2096    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2097    /// so uses must be carefully vetted.
2098    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2099    where
2100        B: IsBounded,
2101    {
2102        Optional::new(
2103            self.location.clone(),
2104            HydroNode::Cast {
2105                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2106                metadata: self
2107                    .location
2108                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2109            },
2110        )
2111    }
2112
2113    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2114        if O::ORDERING_KIND == O2::ORDERING_KIND {
2115            Stream::new(
2116                self.location.clone(),
2117                self.ir_node.replace(HydroNode::Placeholder),
2118            )
2119        } else {
2120            panic!(
2121                "Runtime ordering {:?} did not match requested cast {:?}.",
2122                O::ORDERING_KIND,
2123                O2::ORDERING_KIND
2124            )
2125        }
2126    }
2127
2128    /// Explicitly "casts" the stream to a type with a different ordering
2129    /// guarantee. Useful in unsafe code where the ordering cannot be proven
2130    /// by the type-system.
2131    ///
2132    /// # Non-Determinism
2133    /// This function is used as an escape hatch, and any mistakes in the
2134    /// provided ordering guarantee will propagate into the guarantees
2135    /// for the rest of the program.
2136    pub fn assume_ordering<O2: Ordering>(
2137        self,
2138        _nondet: NonDet,
2139    ) -> Stream<T, L::DropConsistency, B, O2, R> {
2140        if O::ORDERING_KIND == O2::ORDERING_KIND {
2141            self.use_ordering_type().weaken_consistency()
2142        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2143            // We can always weaken the ordering guarantee
2144            let target_location = self.location().drop_consistency();
2145            Stream::new(
2146                target_location.clone(),
2147                HydroNode::Cast {
2148                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2149                    metadata: target_location
2150                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2151                },
2152            )
2153        } else {
2154            let target_location = self.location().drop_consistency();
2155            Stream::new(
2156                target_location.clone(),
2157                HydroNode::ObserveNonDet {
2158                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2159                    trusted: false,
2160                    metadata: target_location
2161                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2162                },
2163            )
2164        }
2165    }
2166
2167    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2168    // intermediate states will not be revealed
2169    fn assume_ordering_trusted_bounded<O2: Ordering>(
2170        self,
2171        nondet: NonDet,
2172    ) -> Stream<T, L, B, O2, R> {
2173        if B::BOUNDED {
2174            self.assume_ordering_trusted(nondet)
2175        } else {
2176            let self_location = self.location.clone();
2177            let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2178            Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2179        }
2180    }
2181
2182    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2183    // is not observable
2184    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2185        self,
2186        _nondet: NonDet,
2187    ) -> Stream<T, L, B, O2, R> {
2188        if O::ORDERING_KIND == O2::ORDERING_KIND {
2189            self.use_ordering_type()
2190        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2191            // We can always weaken the ordering guarantee
2192            Stream::new(
2193                self.location.clone(),
2194                HydroNode::Cast {
2195                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2196                    metadata: self
2197                        .location
2198                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2199                },
2200            )
2201        } else {
2202            Stream::new(
2203                self.location.clone(),
2204                HydroNode::ObserveNonDet {
2205                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2206                    trusted: true,
2207                    metadata: self
2208                        .location
2209                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2210                },
2211            )
2212        }
2213    }
2214
2215    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2216    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2217    /// which is always safe because that is the weakest possible guarantee.
2218    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2219        self.weaken_ordering::<NoOrder>()
2220    }
2221
2222    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2223    /// enforcing that `O2` is weaker than the input ordering guarantee.
2224    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2225        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2226        self.assume_ordering_trusted::<O2>(nondet)
2227    }
2228
2229    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2230    /// implies that `O == TotalOrder`.
2231    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2232    where
2233        O: IsOrdered,
2234    {
2235        self.assume_ordering_trusted(nondet!(/** no-op */))
2236    }
2237
2238    /// Explicitly "casts" the stream to a type with a different retries
2239    /// guarantee. Useful in unsafe code where the lack of retries cannot
2240    /// be proven by the type-system.
2241    ///
2242    /// # Non-Determinism
2243    /// This function is used as an escape hatch, and any mistakes in the
2244    /// provided retries guarantee will propagate into the guarantees
2245    /// for the rest of the program.
2246    pub fn assume_retries<R2: Retries>(
2247        self,
2248        _nondet: NonDet,
2249    ) -> Stream<T, L::DropConsistency, B, O, R2> {
2250        if R::RETRIES_KIND == R2::RETRIES_KIND {
2251            Stream::new(
2252                self.location.drop_consistency(),
2253                self.ir_node.replace(HydroNode::Placeholder),
2254            )
2255        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2256            // We can always weaken the retries guarantee
2257            let target_location = self.location.drop_consistency();
2258            Stream::new(
2259                target_location.clone(),
2260                HydroNode::Cast {
2261                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2262                    metadata: target_location
2263                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2264                },
2265            )
2266        } else {
2267            let target_location = self.location.drop_consistency();
2268            Stream::new(
2269                target_location.clone(),
2270                HydroNode::ObserveNonDet {
2271                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2272                    trusted: false,
2273                    metadata: target_location
2274                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2275                },
2276            )
2277        }
2278    }
2279
2280    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2281    // is not observable
2282    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2283        if R::RETRIES_KIND == R2::RETRIES_KIND {
2284            Stream::new(
2285                self.location.clone(),
2286                self.ir_node.replace(HydroNode::Placeholder),
2287            )
2288        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2289            // We can always weaken the retries guarantee
2290            Stream::new(
2291                self.location.clone(),
2292                HydroNode::Cast {
2293                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2294                    metadata: self
2295                        .location
2296                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2297                },
2298            )
2299        } else {
2300            Stream::new(
2301                self.location.clone(),
2302                HydroNode::ObserveNonDet {
2303                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2304                    trusted: true,
2305                    metadata: self
2306                        .location
2307                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2308                },
2309            )
2310        }
2311    }
2312
2313    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2314    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2315    /// which is always safe because that is the weakest possible guarantee.
2316    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2317        self.weaken_retries::<AtLeastOnce>()
2318    }
2319
2320    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2321    /// enforcing that `R2` is weaker than the input retries guarantee.
2322    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2323        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2324        self.assume_retries_trusted::<R2>(nondet)
2325    }
2326
2327    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2328    /// implies that `R == ExactlyOnce`.
2329    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2330    where
2331        R: IsExactlyOnce,
2332    {
2333        self.assume_retries_trusted(nondet!(/** no-op */))
2334    }
2335
2336    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2337    /// implies that `B == Bounded`.
2338    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2339    where
2340        B: IsBounded,
2341    {
2342        self.weaken_boundedness()
2343    }
2344
2345    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2346    /// which implies that `B == Bounded`.
2347    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2348        if B::BOUNDED == B2::BOUNDED {
2349            Stream::new(
2350                self.location.clone(),
2351                self.ir_node.replace(HydroNode::Placeholder),
2352            )
2353        } else {
2354            // We can always weaken the boundedness
2355            Stream::new(
2356                self.location.clone(),
2357                HydroNode::Cast {
2358                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2359                    metadata: self
2360                        .location
2361                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2362                },
2363            )
2364        }
2365    }
2366}
2367
2368impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2369where
2370    L: Location<'a>,
2371{
2372    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2373    ///
2374    /// # Example
2375    /// ```rust
2376    /// # #[cfg(feature = "deploy")] {
2377    /// # use hydro_lang::prelude::*;
2378    /// # use futures::StreamExt;
2379    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2380    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2381    /// # }, |mut stream| async move {
2382    /// // 1, 2, 3
2383    /// # for w in vec![1, 2, 3] {
2384    /// #     assert_eq!(stream.next().await.unwrap(), w);
2385    /// # }
2386    /// # }));
2387    /// # }
2388    /// ```
2389    pub fn cloned(self) -> Stream<T, L, B, O, R>
2390    where
2391        T: Clone,
2392    {
2393        self.map(q!(|d| d.clone()))
2394    }
2395}
2396
2397impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2398where
2399    L: Location<'a>,
2400{
2401    /// Computes the number of elements in the stream as a [`Singleton`].
2402    ///
2403    /// # Example
2404    /// ```rust
2405    /// # #[cfg(feature = "deploy")] {
2406    /// # use hydro_lang::prelude::*;
2407    /// # use futures::StreamExt;
2408    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2409    /// let tick = process.tick();
2410    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2411    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2412    /// batch.count().all_ticks()
2413    /// # }, |mut stream| async move {
2414    /// // 4
2415    /// # assert_eq!(stream.next().await.unwrap(), 4);
2416    /// # }));
2417    /// # }
2418    /// ```
2419    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2420        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2421            /// Order does not affect eventual count, and also does not affect intermediate states.
2422        ))
2423        .fold(
2424            q!(|| 0usize),
2425            q!(
2426                |count, _| *count += 1,
2427                monotone = manual_proof!(/** += 1 is monotone */)
2428            ),
2429        )
2430    }
2431}
2432
2433impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2434    /// Produces a new stream that merges the elements of the two input streams.
2435    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2436    ///
2437    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2438    /// [`Bounded`], you can use [`Stream::chain`] instead.
2439    ///
2440    /// # Example
2441    /// ```rust
2442    /// # #[cfg(feature = "deploy")] {
2443    /// # use hydro_lang::prelude::*;
2444    /// # use futures::StreamExt;
2445    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2446    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2447    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2448    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2449    /// # }, |mut stream| async move {
2450    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2451    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2452    /// #     assert_eq!(stream.next().await.unwrap(), w);
2453    /// # }
2454    /// # }));
2455    /// # }
2456    /// ```
2457    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2458        self,
2459        other: Stream<T, L, Unbounded, O2, R2>,
2460    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2461    where
2462        R: MinRetries<R2>,
2463    {
2464        Stream::new(
2465            self.location.clone(),
2466            HydroNode::Chain {
2467                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2468                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2469                metadata: self.location.new_node_metadata(Stream::<
2470                    T,
2471                    L,
2472                    Unbounded,
2473                    NoOrder,
2474                    <R as MinRetries<R2>>::Min,
2475                >::collection_kind()),
2476            },
2477        )
2478    }
2479
2480    /// Deprecated: use [`Stream::merge_unordered`] instead.
2481    #[deprecated(note = "use `merge_unordered` instead")]
2482    pub fn interleave<O2: Ordering, R2: Retries>(
2483        self,
2484        other: Stream<T, L, Unbounded, O2, R2>,
2485    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2486    where
2487        R: MinRetries<R2>,
2488    {
2489        self.merge_unordered(other)
2490    }
2491}
2492
2493impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2494    /// Produces a new stream that combines the elements of the two input streams,
2495    /// preserving the relative order of elements within each input.
2496    ///
2497    /// # Non-Determinism
2498    /// The order in which elements *across* the two streams will be interleaved is
2499    /// non-deterministic, so the order of elements will vary across runs. If the output
2500    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2501    /// but emits an unordered stream. For deterministic first-then-second ordering on
2502    /// bounded streams, use [`Stream::chain`].
2503    ///
2504    /// # Example
2505    /// ```rust
2506    /// # #[cfg(feature = "deploy")] {
2507    /// # use hydro_lang::prelude::*;
2508    /// # use futures::StreamExt;
2509    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2510    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2511    /// # process.source_iter(q!(vec![1, 3])).into();
2512    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2513    /// # }, |mut stream| async move {
2514    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2515    /// # for w in vec![1, 3, 2, 4] {
2516    /// #     assert_eq!(stream.next().await.unwrap(), w);
2517    /// # }
2518    /// # }));
2519    /// # }
2520    /// ```
2521    pub fn merge_ordered<R2: Retries>(
2522        self,
2523        other: Stream<T, L, B, TotalOrder, R2>,
2524        _nondet: NonDet,
2525    ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2526    where
2527        R: MinRetries<R2>,
2528    {
2529        let target_location = self.location().drop_consistency();
2530        Stream::new(
2531            target_location.clone(),
2532            HydroNode::MergeOrdered {
2533                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2534                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2535                metadata: target_location.new_node_metadata(Stream::<
2536                    T,
2537                    L::DropConsistency,
2538                    B,
2539                    TotalOrder,
2540                    <R as MinRetries<R2>>::Min,
2541                >::collection_kind()),
2542            },
2543        )
2544    }
2545}
2546
2547impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2548where
2549    L: Location<'a>,
2550{
2551    /// Produces a new stream that emits the input elements in sorted order.
2552    ///
2553    /// The input stream can have any ordering guarantee, but the output stream
2554    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2555    /// elements in the input stream are available, so it requires the input stream
2556    /// to be [`Bounded`].
2557    ///
2558    /// # Example
2559    /// ```rust
2560    /// # #[cfg(feature = "deploy")] {
2561    /// # use hydro_lang::prelude::*;
2562    /// # use futures::StreamExt;
2563    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2564    /// let tick = process.tick();
2565    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2566    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2567    /// batch.sort().all_ticks()
2568    /// # }, |mut stream| async move {
2569    /// // 1, 2, 3, 4
2570    /// # for w in (1..5) {
2571    /// #     assert_eq!(stream.next().await.unwrap(), w);
2572    /// # }
2573    /// # }));
2574    /// # }
2575    /// ```
2576    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2577    where
2578        B: IsBounded,
2579        T: Ord,
2580    {
2581        let this = self.make_bounded();
2582        Stream::new(
2583            this.location.clone(),
2584            HydroNode::Sort {
2585                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2586                metadata: this
2587                    .location
2588                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2589            },
2590        )
2591    }
2592
2593    /// Produces a new stream that first emits the elements of the `self` stream,
2594    /// and then emits the elements of the `other` stream. The output stream has
2595    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2596    /// [`TotalOrder`] guarantee.
2597    ///
2598    /// Currently, both input streams must be [`Bounded`]. This operator will block
2599    /// on the first stream until all its elements are available. In a future version,
2600    /// we will relax the requirement on the `other` stream.
2601    ///
2602    /// # Example
2603    /// ```rust
2604    /// # #[cfg(feature = "deploy")] {
2605    /// # use hydro_lang::prelude::*;
2606    /// # use futures::StreamExt;
2607    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2608    /// let tick = process.tick();
2609    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2610    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2611    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2612    /// # }, |mut stream| async move {
2613    /// // 2, 3, 4, 5, 1, 2, 3, 4
2614    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2615    /// #     assert_eq!(stream.next().await.unwrap(), w);
2616    /// # }
2617    /// # }));
2618    /// # }
2619    /// ```
2620    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2621        self,
2622        other: Stream<T, L, B2, O2, R2>,
2623    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2624    where
2625        B: IsBounded,
2626        O: MinOrder<O2>,
2627        R: MinRetries<R2>,
2628    {
2629        check_matching_location(&self.location, &other.location);
2630
2631        Stream::new(
2632            self.location.clone(),
2633            HydroNode::Chain {
2634                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2635                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2636                metadata: self.location.new_node_metadata(Stream::<
2637                    T,
2638                    L,
2639                    B2,
2640                    <O as MinOrder<O2>>::Min,
2641                    <R as MinRetries<R2>>::Min,
2642                >::collection_kind()),
2643            },
2644        )
2645    }
2646
2647    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2648    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2649    /// because this is compiled into a nested loop.
2650    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2651        self,
2652        other: Stream<T2, L, Bounded, O2, R2>,
2653    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2654    where
2655        B: IsBounded,
2656        T: Clone,
2657        T2: Clone,
2658        R: MinRetries<R2>,
2659    {
2660        let this = self.make_bounded();
2661        check_matching_location(&this.location, &other.location);
2662
2663        Stream::new(
2664            this.location.clone(),
2665            HydroNode::CrossProduct {
2666                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2667                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2668                metadata: this.location.new_node_metadata(Stream::<
2669                    (T, T2),
2670                    L,
2671                    Bounded,
2672                    <O2 as MinOrder<O>>::Min,
2673                    <R as MinRetries<R2>>::Min,
2674                >::collection_kind()),
2675            },
2676        )
2677    }
2678
2679    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2680    /// `self` used as the values for *each* key.
2681    ///
2682    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2683    /// values. For example, it can be used to send the same set of elements to several cluster
2684    /// members, if the membership information is available as a [`KeyedSingleton`].
2685    ///
2686    /// # Example
2687    /// ```rust
2688    /// # #[cfg(feature = "deploy")] {
2689    /// # use hydro_lang::prelude::*;
2690    /// # use futures::StreamExt;
2691    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2692    /// # let tick = process.tick();
2693    /// let keyed_singleton = // { 1: (), 2: () }
2694    /// # process
2695    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2696    /// #     .into_keyed()
2697    /// #     .batch(&tick, nondet!(/** test */))
2698    /// #     .first();
2699    /// let stream = // [ "a", "b" ]
2700    /// # process
2701    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2702    /// #     .batch(&tick, nondet!(/** test */));
2703    /// stream.repeat_with_keys(keyed_singleton)
2704    /// # .entries().all_ticks()
2705    /// # }, |mut stream| async move {
2706    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2707    /// # let mut results = Vec::new();
2708    /// # for _ in 0..4 {
2709    /// #     results.push(stream.next().await.unwrap());
2710    /// # }
2711    /// # results.sort();
2712    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2713    /// # }));
2714    /// # }
2715    /// ```
2716    pub fn repeat_with_keys<K, V2>(
2717        self,
2718        keys: KeyedSingleton<K, V2, L, Bounded>,
2719    ) -> KeyedStream<K, T, L, Bounded, O, R>
2720    where
2721        B: IsBounded,
2722        K: Clone,
2723        T: Clone,
2724    {
2725        keys.keys()
2726            .assume_ordering_trusted::<TotalOrder>(
2727                nondet!(/** keyed stream does not depend on ordering of keys */),
2728            )
2729            .cross_product_nested_loop(self.make_bounded())
2730            .into_keyed()
2731    }
2732
2733    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2734    /// execution until all results are available. The output order is based on when futures
2735    /// complete, and may be different than the input order.
2736    ///
2737    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2738    /// while futures are pending, this variant blocks until the futures resolve.
2739    ///
2740    /// # Example
2741    /// ```rust
2742    /// # #[cfg(feature = "deploy")] {
2743    /// # use std::collections::HashSet;
2744    /// # use futures::StreamExt;
2745    /// # use hydro_lang::prelude::*;
2746    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2747    /// process
2748    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2749    ///     .map(q!(|x| async move {
2750    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2751    ///         x
2752    ///     }))
2753    ///     .resolve_futures_blocking()
2754    /// #   },
2755    /// #   |mut stream| async move {
2756    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2757    /// #       let mut output = HashSet::new();
2758    /// #       for _ in 1..10 {
2759    /// #           output.insert(stream.next().await.unwrap());
2760    /// #       }
2761    /// #       assert_eq!(
2762    /// #           output,
2763    /// #           HashSet::<i32>::from_iter(1..10)
2764    /// #       );
2765    /// #   },
2766    /// # ));
2767    /// # }
2768    /// ```
2769    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2770    where
2771        T: Future,
2772    {
2773        Stream::new(
2774            self.location.clone(),
2775            HydroNode::ResolveFuturesBlocking {
2776                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2777                metadata: self
2778                    .location
2779                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2780            },
2781        )
2782    }
2783
2784    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2785    ///
2786    /// # Example
2787    /// ```rust
2788    /// # #[cfg(feature = "deploy")] {
2789    /// # use hydro_lang::prelude::*;
2790    /// # use futures::StreamExt;
2791    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2792    /// let tick = process.tick();
2793    /// let empty: Stream<i32, _, Bounded> = process
2794    ///   .source_iter(q!(Vec::<i32>::new()))
2795    ///   .batch(&tick, nondet!(/** test */));
2796    /// empty.is_empty().all_ticks()
2797    /// # }, |mut stream| async move {
2798    /// // true
2799    /// # assert_eq!(stream.next().await.unwrap(), true);
2800    /// # }));
2801    /// # }
2802    /// ```
2803    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2804    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2805    where
2806        B: IsBounded,
2807    {
2808        self.make_bounded()
2809            .assume_ordering_trusted::<TotalOrder>(
2810                nondet!(/** is_empty intermediates unaffected by order */),
2811            )
2812            .first()
2813            .is_none()
2814    }
2815}
2816
2817impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2818where
2819    L: Location<'a>,
2820{
2821    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2822    /// by equi-joining the two streams on the key attribute `K`.
2823    ///
2824    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2825    /// and streams the left side through, preserving the left side's ordering. When both
2826    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2827    ///
2828    /// # Example
2829    /// ```rust
2830    /// # #[cfg(feature = "deploy")] {
2831    /// # use hydro_lang::prelude::*;
2832    /// # use std::collections::HashSet;
2833    /// # use futures::StreamExt;
2834    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2835    /// let tick = process.tick();
2836    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2837    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2838    /// stream1.join(stream2)
2839    /// # }, |mut stream| async move {
2840    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2841    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2842    /// # stream.map(|i| assert!(expected.contains(&i)));
2843    /// # }));
2844    /// # }
2845    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2846        self,
2847        n: Stream<(K, V2), L, B2, O2, R2>,
2848    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2849    where
2850        K: Eq + Hash + Clone,
2851        R: MinRetries<R2>,
2852        V1: Clone,
2853        V2: Clone,
2854    {
2855        check_matching_location(&self.location, &n.location);
2856
2857        let ir_node = if B2::BOUNDED {
2858            HydroNode::JoinHalf {
2859                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2860                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2861                metadata: self.location.new_node_metadata(Stream::<
2862                    (K, (V1, V2)),
2863                    L,
2864                    B,
2865                    B2::PreserveOrderIfBounded<O>,
2866                    <R as MinRetries<R2>>::Min,
2867                >::collection_kind()),
2868            }
2869        } else {
2870            HydroNode::Join {
2871                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2872                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2873                metadata: self.location.new_node_metadata(Stream::<
2874                    (K, (V1, V2)),
2875                    L,
2876                    B,
2877                    B2::PreserveOrderIfBounded<O>,
2878                    <R as MinRetries<R2>>::Min,
2879                >::collection_kind()),
2880            }
2881        };
2882
2883        Stream::new(self.location.clone(), ir_node)
2884    }
2885
2886    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2887    /// computes the anti-join of the items in the input -- i.e. returns
2888    /// unique items in the first input that do not have a matching key
2889    /// in the second input.
2890    ///
2891    /// # Example
2892    /// ```rust
2893    /// # #[cfg(feature = "deploy")] {
2894    /// # use hydro_lang::prelude::*;
2895    /// # use futures::StreamExt;
2896    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2897    /// let tick = process.tick();
2898    /// let stream = process
2899    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2900    ///   .batch(&tick, nondet!(/** test */));
2901    /// let batch = process
2902    ///   .source_iter(q!(vec![1, 2]))
2903    ///   .batch(&tick, nondet!(/** test */));
2904    /// stream.anti_join(batch).all_ticks()
2905    /// # }, |mut stream| async move {
2906    /// # for w in vec![(3, 'c'), (4, 'd')] {
2907    /// #     assert_eq!(stream.next().await.unwrap(), w);
2908    /// # }
2909    /// # }));
2910    /// # }
2911    pub fn anti_join<O2: Ordering, R2: Retries>(
2912        self,
2913        n: Stream<K, L, Bounded, O2, R2>,
2914    ) -> Stream<(K, V1), L, B, O, R>
2915    where
2916        K: Eq + Hash,
2917    {
2918        check_matching_location(&self.location, &n.location);
2919
2920        Stream::new(
2921            self.location.clone(),
2922            HydroNode::AntiJoin {
2923                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2924                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2925                metadata: self
2926                    .location
2927                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2928            },
2929        )
2930    }
2931}
2932
2933impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2934    Stream<(K, V), L, B, O, R>
2935{
2936    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2937    /// is used as the key and the second element is added to the entries associated with that key.
2938    ///
2939    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2940    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2941    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2942    /// total ordering _within_ each group but no ordering _across_ groups.
2943    ///
2944    /// # Example
2945    /// ```rust
2946    /// # #[cfg(feature = "deploy")] {
2947    /// # use hydro_lang::prelude::*;
2948    /// # use futures::StreamExt;
2949    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2950    /// process
2951    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2952    ///     .into_keyed()
2953    /// #   .entries()
2954    /// # }, |mut stream| async move {
2955    /// // { 1: [2, 3], 2: [4] }
2956    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2957    /// #     assert_eq!(stream.next().await.unwrap(), w);
2958    /// # }
2959    /// # }));
2960    /// # }
2961    /// ```
2962    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2963        KeyedStream::new(
2964            self.location.clone(),
2965            HydroNode::Cast {
2966                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2967                metadata: self
2968                    .location
2969                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2970            },
2971        )
2972    }
2973}
2974
2975impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2976where
2977    K: Eq + Hash,
2978    L: Location<'a>,
2979{
2980    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2981    /// # Example
2982    /// ```rust
2983    /// # #[cfg(feature = "deploy")] {
2984    /// # use hydro_lang::prelude::*;
2985    /// # use futures::StreamExt;
2986    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2987    /// let tick = process.tick();
2988    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2989    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2990    /// batch.keys().all_ticks()
2991    /// # }, |mut stream| async move {
2992    /// // 1, 2
2993    /// # assert_eq!(stream.next().await.unwrap(), 1);
2994    /// # assert_eq!(stream.next().await.unwrap(), 2);
2995    /// # }));
2996    /// # }
2997    /// ```
2998    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2999        self.into_keyed()
3000            .fold(
3001                q!(|| ()),
3002                q!(
3003                    |_, _| {},
3004                    commutative = manual_proof!(/** values are ignored */),
3005                    idempotent = manual_proof!(/** values are ignored */)
3006                ),
3007            )
3008            .keys()
3009    }
3010}
3011
3012impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3013where
3014    L: Location<'a>,
3015{
3016    /// Returns a stream corresponding to the latest batch of elements being atomically
3017    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
3018    /// the order of the input.
3019    ///
3020    /// # Non-Determinism
3021    /// The batch boundaries are non-deterministic and may change across executions.
3022    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3023        self,
3024        tick: &Tick<L2>,
3025        _nondet: NonDet,
3026    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3027        Stream::new(
3028            tick.drop_consistency(),
3029            HydroNode::Batch {
3030                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3031                metadata: tick
3032                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3033            },
3034        )
3035    }
3036
3037    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
3038    /// See [`Stream::atomic`] for more details.
3039    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3040        Stream::new(
3041            self.location.tick.l.clone(),
3042            HydroNode::EndAtomic {
3043                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3044                metadata: self
3045                    .location
3046                    .tick
3047                    .l
3048                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3049            },
3050        )
3051    }
3052}
3053
3054impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3055where
3056    L: TopLevel<'a>,
3057    F: Future<Output = T>,
3058{
3059    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3060    /// Future outputs are produced as available, regardless of input arrival order.
3061    ///
3062    /// # Example
3063    /// ```rust
3064    /// # #[cfg(feature = "deploy")] {
3065    /// # use std::collections::HashSet;
3066    /// # use futures::StreamExt;
3067    /// # use hydro_lang::prelude::*;
3068    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3069    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3070    ///     .map(q!(|x| async move {
3071    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3072    ///         x
3073    ///     }))
3074    ///     .resolve_futures()
3075    /// #   },
3076    /// #   |mut stream| async move {
3077    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3078    /// #       let mut output = HashSet::new();
3079    /// #       for _ in 1..10 {
3080    /// #           output.insert(stream.next().await.unwrap());
3081    /// #       }
3082    /// #       assert_eq!(
3083    /// #           output,
3084    /// #           HashSet::<i32>::from_iter(1..10)
3085    /// #       );
3086    /// #   },
3087    /// # ));
3088    /// # }
3089    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3090        Stream::new(
3091            self.location.clone(),
3092            HydroNode::ResolveFutures {
3093                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3094                metadata: self
3095                    .location
3096                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3097            },
3098        )
3099    }
3100
3101    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3102    /// Future outputs are produced in the same order as the input stream.
3103    ///
3104    /// # Example
3105    /// ```rust
3106    /// # #[cfg(feature = "deploy")] {
3107    /// # use std::collections::HashSet;
3108    /// # use futures::StreamExt;
3109    /// # use hydro_lang::prelude::*;
3110    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3111    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3112    ///     .map(q!(|x| async move {
3113    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3114    ///         x
3115    ///     }))
3116    ///     .resolve_futures_ordered()
3117    /// #   },
3118    /// #   |mut stream| async move {
3119    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3120    /// #       let mut output = Vec::new();
3121    /// #       for _ in 1..10 {
3122    /// #           output.push(stream.next().await.unwrap());
3123    /// #       }
3124    /// #       assert_eq!(
3125    /// #           output,
3126    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3127    /// #       );
3128    /// #   },
3129    /// # ));
3130    /// # }
3131    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3132        Stream::new(
3133            self.location.clone(),
3134            HydroNode::ResolveFuturesOrdered {
3135                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3136                metadata: self
3137                    .location
3138                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3139            },
3140        )
3141    }
3142}
3143
3144impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3145where
3146    L: Location<'a>,
3147{
3148    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3149    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3150    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3151        Stream::new(
3152            self.location.outer().clone(),
3153            HydroNode::YieldConcat {
3154                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3155                metadata: self
3156                    .location
3157                    .outer()
3158                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3159            },
3160        )
3161    }
3162
3163    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3164    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3165    ///
3166    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3167    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3168    /// stream's [`Tick`] context.
3169    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3170        let out_location = Atomic {
3171            tick: self.location.clone(),
3172        };
3173
3174        Stream::new(
3175            out_location.clone(),
3176            HydroNode::YieldConcat {
3177                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3178                metadata: out_location
3179                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3180            },
3181        )
3182    }
3183
3184    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3185    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3186    /// input.
3187    ///
3188    /// This API is particularly useful for stateful computation on batches of data, such as
3189    /// maintaining an accumulated state that is up to date with the current batch.
3190    ///
3191    /// # Example
3192    /// ```rust
3193    /// # #[cfg(feature = "deploy")] {
3194    /// # use hydro_lang::prelude::*;
3195    /// # use futures::StreamExt;
3196    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3197    /// let tick = process.tick();
3198    /// # // ticks are lazy by default, forces the second tick to run
3199    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3200    /// # let batch_first_tick = process
3201    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3202    /// #  .batch(&tick, nondet!(/** test */));
3203    /// # let batch_second_tick = process
3204    /// #   .source_iter(q!(vec![5, 6, 7]))
3205    /// #   .batch(&tick, nondet!(/** test */))
3206    /// #   .defer_tick(); // appears on the second tick
3207    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3208    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3209    ///
3210    /// input.batch(&tick, nondet!(/** test */))
3211    ///     .across_ticks(|s| s.count()).all_ticks()
3212    /// # }, |mut stream| async move {
3213    /// // [4, 7]
3214    /// assert_eq!(stream.next().await.unwrap(), 4);
3215    /// assert_eq!(stream.next().await.unwrap(), 7);
3216    /// # }));
3217    /// # }
3218    /// ```
3219    pub fn across_ticks<Out: BatchAtomic<'a>>(
3220        self,
3221        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3222    ) -> Out::Batched {
3223        thunk(self.all_ticks_atomic()).batched_atomic()
3224    }
3225
3226    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3227    /// always has the elements of `self` at tick `T - 1`.
3228    ///
3229    /// At tick `0`, the output stream is empty, since there is no previous tick.
3230    ///
3231    /// This operator enables stateful iterative processing with ticks, by sending data from one
3232    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3233    ///
3234    /// # Example
3235    /// ```rust
3236    /// # #[cfg(feature = "deploy")] {
3237    /// # use hydro_lang::prelude::*;
3238    /// # use futures::StreamExt;
3239    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3240    /// let tick = process.tick();
3241    /// // ticks are lazy by default, forces the second tick to run
3242    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3243    ///
3244    /// let batch_first_tick = process
3245    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3246    ///   .batch(&tick, nondet!(/** test */));
3247    /// let batch_second_tick = process
3248    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3249    ///   .batch(&tick, nondet!(/** test */))
3250    ///   .defer_tick(); // appears on the second tick
3251    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3252    ///
3253    /// changes_across_ticks.clone().filter_not_in(
3254    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3255    /// ).all_ticks()
3256    /// # }, |mut stream| async move {
3257    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3258    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3259    /// #     assert_eq!(stream.next().await.unwrap(), w);
3260    /// # }
3261    /// # }));
3262    /// # }
3263    /// ```
3264    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3265        Stream::new(
3266            self.location.clone(),
3267            HydroNode::DeferTick {
3268                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3269                metadata: self
3270                    .location
3271                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3272            },
3273        )
3274    }
3275}
3276
3277#[cfg(test)]
3278mod tests {
3279    #[cfg(feature = "deploy")]
3280    use futures::{SinkExt, StreamExt};
3281    #[cfg(feature = "deploy")]
3282    use hydro_deploy::Deployment;
3283    #[cfg(feature = "deploy")]
3284    use serde::{Deserialize, Serialize};
3285    #[cfg(any(feature = "deploy", feature = "sim"))]
3286    use stageleft::q;
3287
3288    #[cfg(any(feature = "deploy", feature = "sim"))]
3289    use crate::compile::builder::FlowBuilder;
3290    #[cfg(feature = "deploy")]
3291    use crate::live_collections::sliced::sliced;
3292    #[cfg(feature = "deploy")]
3293    use crate::live_collections::stream::ExactlyOnce;
3294    #[cfg(feature = "sim")]
3295    use crate::live_collections::stream::NoOrder;
3296    #[cfg(any(feature = "deploy", feature = "sim"))]
3297    use crate::live_collections::stream::TotalOrder;
3298    #[cfg(any(feature = "deploy", feature = "sim"))]
3299    use crate::location::Location;
3300    #[cfg(feature = "sim")]
3301    use crate::networking::TCP;
3302    #[cfg(any(feature = "deploy", feature = "sim"))]
3303    use crate::nondet::nondet;
3304
3305    mod backtrace_chained_ops;
3306
3307    #[cfg(feature = "deploy")]
3308    struct P1 {}
3309    #[cfg(feature = "deploy")]
3310    struct P2 {}
3311
3312    #[cfg(feature = "deploy")]
3313    #[derive(Serialize, Deserialize, Debug)]
3314    struct SendOverNetwork {
3315        n: u32,
3316    }
3317
3318    #[cfg(feature = "deploy")]
3319    #[tokio::test]
3320    async fn first_ten_distributed() {
3321        use crate::networking::TCP;
3322
3323        let mut deployment = Deployment::new();
3324
3325        let mut flow = FlowBuilder::new();
3326        let first_node = flow.process::<P1>();
3327        let second_node = flow.process::<P2>();
3328        let external = flow.external::<P2>();
3329
3330        let numbers = first_node.source_iter(q!(0..10));
3331        let out_port = numbers
3332            .map(q!(|n| SendOverNetwork { n }))
3333            .send(&second_node, TCP.fail_stop().bincode())
3334            .send_bincode_external(&external);
3335
3336        let nodes = flow
3337            .with_process(&first_node, deployment.Localhost())
3338            .with_process(&second_node, deployment.Localhost())
3339            .with_external(&external, deployment.Localhost())
3340            .deploy(&mut deployment);
3341
3342        deployment.deploy().await.unwrap();
3343
3344        let mut external_out = nodes.connect(out_port).await;
3345
3346        deployment.start().await.unwrap();
3347
3348        for i in 0..10 {
3349            assert_eq!(external_out.next().await.unwrap().n, i);
3350        }
3351    }
3352
3353    #[cfg(feature = "deploy")]
3354    #[tokio::test]
3355    async fn first_cardinality() {
3356        let mut deployment = Deployment::new();
3357
3358        let mut flow = FlowBuilder::new();
3359        let node = flow.process::<()>();
3360        let external = flow.external::<()>();
3361
3362        let node_tick = node.tick();
3363        let count = node_tick
3364            .singleton(q!([1, 2, 3]))
3365            .into_stream()
3366            .flatten_ordered()
3367            .first()
3368            .into_stream()
3369            .count()
3370            .all_ticks()
3371            .send_bincode_external(&external);
3372
3373        let nodes = flow
3374            .with_process(&node, deployment.Localhost())
3375            .with_external(&external, deployment.Localhost())
3376            .deploy(&mut deployment);
3377
3378        deployment.deploy().await.unwrap();
3379
3380        let mut external_out = nodes.connect(count).await;
3381
3382        deployment.start().await.unwrap();
3383
3384        assert_eq!(external_out.next().await.unwrap(), 1);
3385    }
3386
3387    #[cfg(feature = "deploy")]
3388    #[tokio::test]
3389    async fn unbounded_reduce_remembers_state() {
3390        let mut deployment = Deployment::new();
3391
3392        let mut flow = FlowBuilder::new();
3393        let node = flow.process::<()>();
3394        let external = flow.external::<()>();
3395
3396        let (input_port, input) = node.source_external_bincode(&external);
3397        let out = input
3398            .reduce(q!(|acc, v| *acc += v))
3399            .sample_eager(nondet!(/** test */))
3400            .send_bincode_external(&external);
3401
3402        let nodes = flow
3403            .with_process(&node, deployment.Localhost())
3404            .with_external(&external, deployment.Localhost())
3405            .deploy(&mut deployment);
3406
3407        deployment.deploy().await.unwrap();
3408
3409        let mut external_in = nodes.connect(input_port).await;
3410        let mut external_out = nodes.connect(out).await;
3411
3412        deployment.start().await.unwrap();
3413
3414        external_in.send(1).await.unwrap();
3415        assert_eq!(external_out.next().await.unwrap(), 1);
3416
3417        external_in.send(2).await.unwrap();
3418        assert_eq!(external_out.next().await.unwrap(), 3);
3419    }
3420
3421    #[cfg(feature = "deploy")]
3422    #[tokio::test]
3423    async fn top_level_bounded_cross_singleton() {
3424        let mut deployment = Deployment::new();
3425
3426        let mut flow = FlowBuilder::new();
3427        let node = flow.process::<()>();
3428        let external = flow.external::<()>();
3429
3430        let (input_port, input) =
3431            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3432
3433        let out = input
3434            .cross_singleton(
3435                node.source_iter(q!(vec![1, 2, 3]))
3436                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3437            )
3438            .send_bincode_external(&external);
3439
3440        let nodes = flow
3441            .with_process(&node, deployment.Localhost())
3442            .with_external(&external, deployment.Localhost())
3443            .deploy(&mut deployment);
3444
3445        deployment.deploy().await.unwrap();
3446
3447        let mut external_in = nodes.connect(input_port).await;
3448        let mut external_out = nodes.connect(out).await;
3449
3450        deployment.start().await.unwrap();
3451
3452        external_in.send(1).await.unwrap();
3453        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3454
3455        external_in.send(2).await.unwrap();
3456        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3457    }
3458
3459    #[cfg(feature = "deploy")]
3460    #[tokio::test]
3461    async fn top_level_bounded_reduce_cardinality() {
3462        let mut deployment = Deployment::new();
3463
3464        let mut flow = FlowBuilder::new();
3465        let node = flow.process::<()>();
3466        let external = flow.external::<()>();
3467
3468        let (input_port, input) =
3469            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3470
3471        let out = sliced! {
3472            let input = use(input, nondet!(/** test */));
3473            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3474            input.cross_singleton(v.into_stream().count())
3475        }
3476        .send_bincode_external(&external);
3477
3478        let nodes = flow
3479            .with_process(&node, deployment.Localhost())
3480            .with_external(&external, deployment.Localhost())
3481            .deploy(&mut deployment);
3482
3483        deployment.deploy().await.unwrap();
3484
3485        let mut external_in = nodes.connect(input_port).await;
3486        let mut external_out = nodes.connect(out).await;
3487
3488        deployment.start().await.unwrap();
3489
3490        external_in.send(1).await.unwrap();
3491        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3492
3493        external_in.send(2).await.unwrap();
3494        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3495    }
3496
3497    #[cfg(feature = "deploy")]
3498    #[tokio::test]
3499    async fn top_level_bounded_into_singleton_cardinality() {
3500        let mut deployment = Deployment::new();
3501
3502        let mut flow = FlowBuilder::new();
3503        let node = flow.process::<()>();
3504        let external = flow.external::<()>();
3505
3506        let (input_port, input) =
3507            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3508
3509        let out = sliced! {
3510            let input = use(input, nondet!(/** test */));
3511            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3512            input.cross_singleton(v.into_stream().count())
3513        }
3514        .send_bincode_external(&external);
3515
3516        let nodes = flow
3517            .with_process(&node, deployment.Localhost())
3518            .with_external(&external, deployment.Localhost())
3519            .deploy(&mut deployment);
3520
3521        deployment.deploy().await.unwrap();
3522
3523        let mut external_in = nodes.connect(input_port).await;
3524        let mut external_out = nodes.connect(out).await;
3525
3526        deployment.start().await.unwrap();
3527
3528        external_in.send(1).await.unwrap();
3529        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3530
3531        external_in.send(2).await.unwrap();
3532        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3533    }
3534
3535    #[cfg(feature = "deploy")]
3536    #[tokio::test]
3537    async fn atomic_fold_replays_each_tick() {
3538        let mut deployment = Deployment::new();
3539
3540        let mut flow = FlowBuilder::new();
3541        let node = flow.process::<()>();
3542        let external = flow.external::<()>();
3543
3544        let (input_port, input) =
3545            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3546        let tick = node.tick();
3547
3548        let out = input
3549            .batch(&tick, nondet!(/** test */))
3550            .cross_singleton(
3551                node.source_iter(q!(vec![1, 2, 3]))
3552                    .atomic()
3553                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3554                    .snapshot_atomic(&tick, nondet!(/** test */)),
3555            )
3556            .all_ticks()
3557            .send_bincode_external(&external);
3558
3559        let nodes = flow
3560            .with_process(&node, deployment.Localhost())
3561            .with_external(&external, deployment.Localhost())
3562            .deploy(&mut deployment);
3563
3564        deployment.deploy().await.unwrap();
3565
3566        let mut external_in = nodes.connect(input_port).await;
3567        let mut external_out = nodes.connect(out).await;
3568
3569        deployment.start().await.unwrap();
3570
3571        external_in.send(1).await.unwrap();
3572        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3573
3574        external_in.send(2).await.unwrap();
3575        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3576    }
3577
3578    #[cfg(feature = "deploy")]
3579    #[tokio::test]
3580    async fn unbounded_scan_remembers_state() {
3581        let mut deployment = Deployment::new();
3582
3583        let mut flow = FlowBuilder::new();
3584        let node = flow.process::<()>();
3585        let external = flow.external::<()>();
3586
3587        let (input_port, input) = node.source_external_bincode(&external);
3588        let out = input
3589            .scan(
3590                q!(|| 0),
3591                q!(|acc, v| {
3592                    *acc += v;
3593                    Some(*acc)
3594                }),
3595            )
3596            .send_bincode_external(&external);
3597
3598        let nodes = flow
3599            .with_process(&node, deployment.Localhost())
3600            .with_external(&external, deployment.Localhost())
3601            .deploy(&mut deployment);
3602
3603        deployment.deploy().await.unwrap();
3604
3605        let mut external_in = nodes.connect(input_port).await;
3606        let mut external_out = nodes.connect(out).await;
3607
3608        deployment.start().await.unwrap();
3609
3610        external_in.send(1).await.unwrap();
3611        assert_eq!(external_out.next().await.unwrap(), 1);
3612
3613        external_in.send(2).await.unwrap();
3614        assert_eq!(external_out.next().await.unwrap(), 3);
3615    }
3616
3617    #[cfg(feature = "deploy")]
3618    #[tokio::test]
3619    async fn unbounded_enumerate_remembers_state() {
3620        let mut deployment = Deployment::new();
3621
3622        let mut flow = FlowBuilder::new();
3623        let node = flow.process::<()>();
3624        let external = flow.external::<()>();
3625
3626        let (input_port, input) = node.source_external_bincode(&external);
3627        let out = input.enumerate().send_bincode_external(&external);
3628
3629        let nodes = flow
3630            .with_process(&node, deployment.Localhost())
3631            .with_external(&external, deployment.Localhost())
3632            .deploy(&mut deployment);
3633
3634        deployment.deploy().await.unwrap();
3635
3636        let mut external_in = nodes.connect(input_port).await;
3637        let mut external_out = nodes.connect(out).await;
3638
3639        deployment.start().await.unwrap();
3640
3641        external_in.send(1).await.unwrap();
3642        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3643
3644        external_in.send(2).await.unwrap();
3645        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3646    }
3647
3648    #[cfg(feature = "deploy")]
3649    #[tokio::test]
3650    async fn unbounded_unique_remembers_state() {
3651        let mut deployment = Deployment::new();
3652
3653        let mut flow = FlowBuilder::new();
3654        let node = flow.process::<()>();
3655        let external = flow.external::<()>();
3656
3657        let (input_port, input) =
3658            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3659        let out = input.unique().send_bincode_external(&external);
3660
3661        let nodes = flow
3662            .with_process(&node, deployment.Localhost())
3663            .with_external(&external, deployment.Localhost())
3664            .deploy(&mut deployment);
3665
3666        deployment.deploy().await.unwrap();
3667
3668        let mut external_in = nodes.connect(input_port).await;
3669        let mut external_out = nodes.connect(out).await;
3670
3671        deployment.start().await.unwrap();
3672
3673        external_in.send(1).await.unwrap();
3674        assert_eq!(external_out.next().await.unwrap(), 1);
3675
3676        external_in.send(2).await.unwrap();
3677        assert_eq!(external_out.next().await.unwrap(), 2);
3678
3679        external_in.send(1).await.unwrap();
3680        external_in.send(3).await.unwrap();
3681        assert_eq!(external_out.next().await.unwrap(), 3);
3682    }
3683
3684    #[cfg(feature = "sim")]
3685    #[test]
3686    #[should_panic]
3687    fn sim_batch_nondet_size() {
3688        let mut flow = FlowBuilder::new();
3689        let node = flow.process::<()>();
3690
3691        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3692
3693        let tick = node.tick();
3694        let out_recv = input
3695            .batch(&tick, nondet!(/** test */))
3696            .count()
3697            .all_ticks()
3698            .sim_output();
3699
3700        flow.sim().exhaustive(async || {
3701            in_send.send(());
3702            in_send.send(());
3703            in_send.send(());
3704
3705            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3706        });
3707    }
3708
3709    #[cfg(feature = "sim")]
3710    #[test]
3711    fn sim_batch_preserves_order() {
3712        let mut flow = FlowBuilder::new();
3713        let node = flow.process::<()>();
3714
3715        let (in_send, input) = node.sim_input();
3716
3717        let tick = node.tick();
3718        let out_recv = input
3719            .batch(&tick, nondet!(/** test */))
3720            .all_ticks()
3721            .sim_output();
3722
3723        flow.sim().exhaustive(async || {
3724            in_send.send(1);
3725            in_send.send(2);
3726            in_send.send(3);
3727
3728            out_recv.assert_yields_only([1, 2, 3]).await;
3729        });
3730    }
3731
3732    #[cfg(feature = "sim")]
3733    #[test]
3734    #[should_panic]
3735    fn sim_batch_unordered_shuffles() {
3736        let mut flow = FlowBuilder::new();
3737        let node = flow.process::<()>();
3738
3739        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3740
3741        let tick = node.tick();
3742        let batch = input.batch(&tick, nondet!(/** test */));
3743        let out_recv = batch
3744            .clone()
3745            .min()
3746            .zip(batch.max())
3747            .all_ticks()
3748            .sim_output();
3749
3750        flow.sim().exhaustive(async || {
3751            in_send.send_many_unordered([1, 2, 3]);
3752
3753            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3754                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3755            }
3756        });
3757    }
3758
3759    #[cfg(feature = "sim")]
3760    #[test]
3761    fn sim_batch_unordered_shuffles_count() {
3762        let mut flow = FlowBuilder::new();
3763        let node = flow.process::<()>();
3764
3765        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3766
3767        let tick = node.tick();
3768        let batch = input.batch(&tick, nondet!(/** test */));
3769        let out_recv = batch.all_ticks().sim_output();
3770
3771        let instance_count = flow.sim().exhaustive(async || {
3772            in_send.send_many_unordered([1, 2, 3, 4]);
3773            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3774        });
3775
3776        assert_eq!(
3777            instance_count,
3778            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3779        )
3780    }
3781
3782    #[cfg(feature = "sim")]
3783    #[test]
3784    #[should_panic]
3785    fn sim_observe_order_batched() {
3786        let mut flow = FlowBuilder::new();
3787        let node = flow.process::<()>();
3788
3789        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3790
3791        let tick = node.tick();
3792        let batch = input.batch(&tick, nondet!(/** test */));
3793        let out_recv = batch
3794            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3795            .all_ticks()
3796            .sim_output();
3797
3798        flow.sim().exhaustive(async || {
3799            in_send.send_many_unordered([1, 2, 3, 4]);
3800            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3801        });
3802    }
3803
3804    #[cfg(feature = "sim")]
3805    #[test]
3806    fn sim_observe_order_batched_count() {
3807        let mut flow = FlowBuilder::new();
3808        let node = flow.process::<()>();
3809
3810        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3811
3812        let tick = node.tick();
3813        let batch = input.batch(&tick, nondet!(/** test */));
3814        let out_recv = batch
3815            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3816            .all_ticks()
3817            .sim_output();
3818
3819        let instance_count = flow.sim().exhaustive(async || {
3820            in_send.send_many_unordered([1, 2, 3, 4]);
3821            let _ = out_recv.collect::<Vec<_>>().await;
3822        });
3823
3824        assert_eq!(
3825            instance_count,
3826            192 // 4! * 2^{4 - 1}
3827        )
3828    }
3829
3830    #[cfg(feature = "sim")]
3831    #[test]
3832    fn sim_unordered_count_instance_count() {
3833        let mut flow = FlowBuilder::new();
3834        let node = flow.process::<()>();
3835
3836        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3837
3838        let tick = node.tick();
3839        let out_recv = input
3840            .count()
3841            .snapshot(&tick, nondet!(/** test */))
3842            .all_ticks()
3843            .sim_output();
3844
3845        let instance_count = flow.sim().exhaustive(async || {
3846            in_send.send_many_unordered([1, 2, 3, 4]);
3847            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3848        });
3849
3850        assert_eq!(
3851            instance_count,
3852            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3853        )
3854    }
3855
3856    #[cfg(feature = "sim")]
3857    #[test]
3858    fn sim_top_level_assume_ordering() {
3859        let mut flow = FlowBuilder::new();
3860        let node = flow.process::<()>();
3861
3862        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3863
3864        let out_recv = input
3865            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3866            .sim_output();
3867
3868        let instance_count = flow.sim().exhaustive(async || {
3869            in_send.send_many_unordered([1, 2, 3]);
3870            let mut out = out_recv.collect::<Vec<_>>().await;
3871            out.sort();
3872            assert_eq!(out, vec![1, 2, 3]);
3873        });
3874
3875        assert_eq!(instance_count, 6)
3876    }
3877
3878    #[cfg(feature = "sim")]
3879    #[test]
3880    fn sim_top_level_assume_ordering_cycle_back() {
3881        let mut flow = FlowBuilder::new();
3882        let node = flow.process::<()>();
3883        let node2 = flow.process::<()>();
3884
3885        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3886
3887        let (complete_cycle_back, cycle_back) =
3888            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3889        let ordered = input
3890            .merge_unordered(cycle_back)
3891            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3892        complete_cycle_back.complete(
3893            ordered
3894                .clone()
3895                .map(q!(|v| v + 1))
3896                .filter(q!(|v| v % 2 == 1))
3897                .send(&node2, TCP.fail_stop().bincode())
3898                .send(&node, TCP.fail_stop().bincode()),
3899        );
3900
3901        let out_recv = ordered.sim_output();
3902
3903        let mut saw = false;
3904        let instance_count = flow.sim().exhaustive(async || {
3905            in_send.send_many_unordered([0, 2]);
3906            let out = out_recv.collect::<Vec<_>>().await;
3907
3908            if out.starts_with(&[0, 1, 2]) {
3909                saw = true;
3910            }
3911        });
3912
3913        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3914        assert_eq!(instance_count, 6);
3915    }
3916
3917    #[cfg(feature = "sim")]
3918    #[test]
3919    fn sim_top_level_assume_ordering_cycle_back_tick() {
3920        let mut flow = FlowBuilder::new();
3921        let node = flow.process::<()>();
3922        let node2 = flow.process::<()>();
3923
3924        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3925
3926        let (complete_cycle_back, cycle_back) =
3927            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3928        let ordered = input
3929            .merge_unordered(cycle_back)
3930            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3931        complete_cycle_back.complete(
3932            ordered
3933                .clone()
3934                .batch(&node.tick(), nondet!(/** test */))
3935                .all_ticks()
3936                .map(q!(|v| v + 1))
3937                .filter(q!(|v| v % 2 == 1))
3938                .send(&node2, TCP.fail_stop().bincode())
3939                .send(&node, TCP.fail_stop().bincode()),
3940        );
3941
3942        let out_recv = ordered.sim_output();
3943
3944        let mut saw = false;
3945        let instance_count = flow.sim().exhaustive(async || {
3946            in_send.send_many_unordered([0, 2]);
3947            let out = out_recv.collect::<Vec<_>>().await;
3948
3949            if out.starts_with(&[0, 1, 2]) {
3950                saw = true;
3951            }
3952        });
3953
3954        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3955        assert_eq!(instance_count, 58);
3956    }
3957
3958    #[cfg(feature = "sim")]
3959    #[test]
3960    fn sim_top_level_assume_ordering_multiple() {
3961        let mut flow = FlowBuilder::new();
3962        let node = flow.process::<()>();
3963        let node2 = flow.process::<()>();
3964
3965        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3966        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3967
3968        let (complete_cycle_back, cycle_back) =
3969            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3970        let input1_ordered = input
3971            .clone()
3972            .merge_unordered(cycle_back)
3973            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3974        let foo = input1_ordered
3975            .clone()
3976            .map(q!(|v| v + 3))
3977            .weaken_ordering::<NoOrder>()
3978            .merge_unordered(input2)
3979            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3980
3981        complete_cycle_back.complete(
3982            foo.filter(q!(|v| *v == 3))
3983                .send(&node2, TCP.fail_stop().bincode())
3984                .send(&node, TCP.fail_stop().bincode()),
3985        );
3986
3987        let out_recv = input1_ordered.sim_output();
3988
3989        let mut saw = false;
3990        let instance_count = flow.sim().exhaustive(async || {
3991            in_send.send_many_unordered([0, 1]);
3992            let out = out_recv.collect::<Vec<_>>().await;
3993
3994            if out.starts_with(&[0, 3, 1]) {
3995                saw = true;
3996            }
3997        });
3998
3999        assert!(saw, "did not see an instance with 0, 3, 1 in order");
4000        assert_eq!(instance_count, 24);
4001    }
4002
4003    #[cfg(feature = "sim")]
4004    #[test]
4005    fn sim_atomic_assume_ordering_cycle_back() {
4006        let mut flow = FlowBuilder::new();
4007        let node = flow.process::<()>();
4008        let node2 = flow.process::<()>();
4009
4010        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4011
4012        let (complete_cycle_back, cycle_back) =
4013            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4014        let ordered = input
4015            .merge_unordered(cycle_back)
4016            .atomic()
4017            .assume_ordering::<TotalOrder>(nondet!(/** test */))
4018            .end_atomic();
4019        complete_cycle_back.complete(
4020            ordered
4021                .clone()
4022                .map(q!(|v| v + 1))
4023                .filter(q!(|v| v % 2 == 1))
4024                .send(&node2, TCP.fail_stop().bincode())
4025                .send(&node, TCP.fail_stop().bincode()),
4026        );
4027
4028        let out_recv = ordered.sim_output();
4029
4030        let instance_count = flow.sim().exhaustive(async || {
4031            in_send.send_many_unordered([0, 2]);
4032            let out = out_recv.collect::<Vec<_>>().await;
4033            assert_eq!(out.len(), 4);
4034        });
4035        assert_eq!(instance_count, 22);
4036    }
4037
4038    #[cfg(feature = "deploy")]
4039    #[tokio::test]
4040    async fn partition_evens_odds() {
4041        let mut deployment = Deployment::new();
4042
4043        let mut flow = FlowBuilder::new();
4044        let node = flow.process::<()>();
4045        let external = flow.external::<()>();
4046
4047        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4048        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4049        let evens_port = evens.send_bincode_external(&external);
4050        let odds_port = odds.send_bincode_external(&external);
4051
4052        let nodes = flow
4053            .with_process(&node, deployment.Localhost())
4054            .with_external(&external, deployment.Localhost())
4055            .deploy(&mut deployment);
4056
4057        deployment.deploy().await.unwrap();
4058
4059        let mut evens_out = nodes.connect(evens_port).await;
4060        let mut odds_out = nodes.connect(odds_port).await;
4061
4062        deployment.start().await.unwrap();
4063
4064        let mut even_results = Vec::new();
4065        for _ in 0..3 {
4066            even_results.push(evens_out.next().await.unwrap());
4067        }
4068        even_results.sort();
4069        assert_eq!(even_results, vec![2, 4, 6]);
4070
4071        let mut odd_results = Vec::new();
4072        for _ in 0..3 {
4073            odd_results.push(odds_out.next().await.unwrap());
4074        }
4075        odd_results.sort();
4076        assert_eq!(odd_results, vec![1, 3, 5]);
4077    }
4078
4079    #[cfg(feature = "deploy")]
4080    #[tokio::test]
4081    async fn unconsumed_inspect_still_runs() {
4082        use crate::deploy::DeployCrateWrapper;
4083
4084        let mut deployment = Deployment::new();
4085
4086        let mut flow = FlowBuilder::new();
4087        let node = flow.process::<()>();
4088
4089        // The return value of .inspect() is intentionally dropped.
4090        // Before the Null-root fix, this would silently do nothing.
4091        node.source_iter(q!(0..5))
4092            .inspect(q!(|x| println!("inspect: {}", x)));
4093
4094        let nodes = flow
4095            .with_process(&node, deployment.Localhost())
4096            .deploy(&mut deployment);
4097
4098        deployment.deploy().await.unwrap();
4099
4100        let mut stdout = nodes.get_process(&node).stdout();
4101
4102        deployment.start().await.unwrap();
4103
4104        let mut lines = Vec::new();
4105        for _ in 0..5 {
4106            lines.push(stdout.recv().await.unwrap());
4107        }
4108        lines.sort();
4109        assert_eq!(
4110            lines,
4111            vec![
4112                "inspect: 0",
4113                "inspect: 1",
4114                "inspect: 2",
4115                "inspect: 3",
4116                "inspect: 4",
4117            ]
4118        );
4119    }
4120
4121    #[cfg(feature = "sim")]
4122    #[test]
4123    fn sim_limit() {
4124        let mut flow = FlowBuilder::new();
4125        let node = flow.process::<()>();
4126
4127        let (in_send, input) = node.sim_input();
4128
4129        let out_recv = input.limit(q!(3)).sim_output();
4130
4131        flow.sim().exhaustive(async || {
4132            in_send.send(1);
4133            in_send.send(2);
4134            in_send.send(3);
4135            in_send.send(4);
4136            in_send.send(5);
4137
4138            out_recv.assert_yields_only([1, 2, 3]).await;
4139        });
4140    }
4141
4142    #[cfg(feature = "sim")]
4143    #[test]
4144    fn sim_limit_zero() {
4145        let mut flow = FlowBuilder::new();
4146        let node = flow.process::<()>();
4147
4148        let (in_send, input) = node.sim_input();
4149
4150        let out_recv = input.limit(q!(0)).sim_output();
4151
4152        flow.sim().exhaustive(async || {
4153            in_send.send(1);
4154            in_send.send(2);
4155
4156            out_recv.assert_yields_only::<i32, _>([]).await;
4157        });
4158    }
4159
4160    #[cfg(feature = "sim")]
4161    #[test]
4162    fn sim_merge_ordered() {
4163        let mut flow = FlowBuilder::new();
4164        let node = flow.process::<()>();
4165
4166        let (in_send, input) = node.sim_input();
4167        let (in_send2, input2) = node.sim_input();
4168
4169        let out_recv = input
4170            .merge_ordered(input2, nondet!(/** test */))
4171            .sim_output();
4172
4173        let mut saw_out_of_order = false;
4174        let instances = flow.sim().exhaustive(async || {
4175            in_send.send(1);
4176            in_send.send(2);
4177            in_send2.send(3);
4178            in_send2.send(4);
4179
4180            let out = out_recv.collect::<Vec<_>>().await;
4181
4182            if out == [1, 3, 2, 4] {
4183                saw_out_of_order = true;
4184            }
4185
4186            // Assert ordering preservation: elements from each input must
4187            // appear in their original relative order.
4188            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4189            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4190            assert_eq!(
4191                first_elements,
4192                vec![1, 2],
4193                "first input order violated: {:?}",
4194                out
4195            );
4196            assert_eq!(
4197                second_elements,
4198                vec![3, 4],
4199                "second input order violated: {:?}",
4200                out
4201            );
4202
4203            first_elements.append(&mut second_elements);
4204            first_elements.sort();
4205            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4206        });
4207
4208        assert!(saw_out_of_order);
4209        assert_eq!(instances, 6);
4210    }
4211
4212    /// Tests that merge_ordered passes through elements when only one input
4213    /// has data.
4214    #[cfg(feature = "sim")]
4215    #[test]
4216    fn sim_merge_ordered_one_empty() {
4217        let mut flow = FlowBuilder::new();
4218        let node = flow.process::<()>();
4219
4220        let (in_send, input) = node.sim_input();
4221        let (_in_send2, input2) = node.sim_input();
4222
4223        let out_recv = input
4224            .merge_ordered(input2, nondet!(/** test */))
4225            .sim_output();
4226
4227        let instances = flow.sim().exhaustive(async || {
4228            in_send.send(1);
4229            in_send.send(2);
4230
4231            let out = out_recv.collect::<Vec<_>>().await;
4232            assert_eq!(out, vec![1, 2]);
4233        });
4234
4235        // Only one possible interleaving when one input is empty
4236        assert_eq!(instances, 1);
4237    }
4238
4239    /// Tests that merge_ordered correctly handles feedback cycles.
4240    /// An element output from merge_ordered is filtered and cycled back to
4241    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4242    /// element to arrive and potentially be emitted before elements still
4243    /// waiting on the other input.
4244    #[cfg(feature = "sim")]
4245    #[test]
4246    fn sim_merge_ordered_cycle_back() {
4247        let mut flow = FlowBuilder::new();
4248        let node = flow.process::<()>();
4249
4250        let (in_send, input) = node.sim_input();
4251
4252        // Create a forward ref for the cycle back
4253        let (complete_cycle_back, cycle_back) =
4254            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4255
4256        // merge_ordered: input (external) with cycle_back
4257        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4258
4259        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4260        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4261
4262        let out_recv = merged.sim_output();
4263
4264        // Send 1 and 2. Element 1 should cycle back as 10.
4265        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4266        let mut saw_cycle_before_second = false;
4267        flow.sim().exhaustive(async || {
4268            in_send.send(1);
4269            in_send.send(2);
4270
4271            let out = out_recv.collect::<Vec<_>>().await;
4272
4273            // 10 must always come after 1 (causal dependency)
4274            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4275            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4276            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4277
4278            // Check if we see [1, 10, 2] — the cycled element beats the second input
4279            if out == [1, 10, 2] {
4280                saw_cycle_before_second = true;
4281            }
4282
4283            let mut sorted = out;
4284            sorted.sort();
4285            assert_eq!(sorted, vec![1, 2, 10]);
4286        });
4287
4288        assert!(
4289            saw_cycle_before_second,
4290            "never saw the cycled element arrive before the second input element"
4291        );
4292    }
4293
4294    /// Tests that merge_ordered correctly interleaves when one input has a
4295    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4296    /// element 2 should be able to appear after b's elements.
4297    #[cfg(feature = "sim")]
4298    #[test]
4299    fn sim_merge_ordered_delayed() {
4300        let mut flow = FlowBuilder::new();
4301        let node = flow.process::<()>();
4302
4303        let (in_send, input) = node.sim_input();
4304        let (in_send2, input2) = node.sim_input();
4305
4306        let out_recv = input
4307            .merge_ordered(input2, nondet!(/** test */))
4308            .sim_output();
4309
4310        let mut saw_delayed_interleaving = false;
4311        flow.sim().exhaustive(async || {
4312            // Send 1 from a, and 3, 4 from b
4313            in_send.send(1);
4314            in_send2.send(3);
4315            in_send2.send(4);
4316
4317            // Collect what's available so far
4318            let first_batch = out_recv.collect::<Vec<_>>().await;
4319
4320            // Now send the delayed element 2 from a
4321            in_send.send(2);
4322            let second_batch = out_recv.collect::<Vec<_>>().await;
4323
4324            let mut all: Vec<_> = first_batch
4325                .iter()
4326                .chain(second_batch.iter())
4327                .copied()
4328                .collect();
4329
4330            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4331            if all == [1, 3, 4, 2] {
4332                saw_delayed_interleaving = true;
4333            }
4334
4335            all.sort();
4336            assert_eq!(all, vec![1, 2, 3, 4]);
4337        });
4338
4339        assert!(saw_delayed_interleaving);
4340    }
4341
4342    /// Deploy test: merge_ordered with a delayed element on one input.
4343    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4344    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4345    /// both inputs are pulled and the delayed element arrives later.
4346    #[cfg(feature = "deploy")]
4347    #[tokio::test]
4348    async fn deploy_merge_ordered_delayed() {
4349        let mut deployment = Deployment::new();
4350
4351        let mut flow = FlowBuilder::new();
4352        let node = flow.process::<()>();
4353        let external = flow.external::<()>();
4354
4355        let (input_a_port, input_a) = node.source_external_bincode(&external);
4356        let (input_b_port, input_b) = node.source_external_bincode(&external);
4357
4358        let out = input_a
4359            .assume_ordering(nondet!(/** test */))
4360            .merge_ordered(
4361                input_b.assume_ordering(nondet!(/** test */)),
4362                nondet!(/** test */),
4363            )
4364            .send_bincode_external(&external);
4365
4366        let nodes = flow
4367            .with_process(&node, deployment.Localhost())
4368            .with_external(&external, deployment.Localhost())
4369            .deploy(&mut deployment);
4370
4371        deployment.deploy().await.unwrap();
4372
4373        let mut ext_a = nodes.connect(input_a_port).await;
4374        let mut ext_b = nodes.connect(input_b_port).await;
4375        let mut ext_out = nodes.connect(out).await;
4376
4377        deployment.start().await.unwrap();
4378
4379        // Send a=1, b=3, b=4
4380        ext_a.send(1).await.unwrap();
4381        ext_b.send(3).await.unwrap();
4382        ext_b.send(4).await.unwrap();
4383
4384        // Collect the first 3 elements
4385        let mut received = Vec::new();
4386        for _ in 0..3 {
4387            received.push(ext_out.next().await.unwrap());
4388        }
4389
4390        // Now send the delayed a=2
4391        ext_a.send(2).await.unwrap();
4392        received.push(ext_out.next().await.unwrap());
4393
4394        // All elements should be present
4395        received.sort();
4396        assert_eq!(received, vec![1, 2, 3, 4]);
4397    }
4398
4399    #[cfg(feature = "deploy")]
4400    #[tokio::test]
4401    async fn monotone_fold_threshold() {
4402        use crate::properties::manual_proof;
4403
4404        let mut deployment = Deployment::new();
4405
4406        let mut flow = FlowBuilder::new();
4407        let node = flow.process::<()>();
4408        let external = flow.external::<()>();
4409
4410        let in_unbounded: super::Stream<_, _> =
4411            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4412        let sum = in_unbounded.fold(
4413            q!(|| 0),
4414            q!(
4415                |sum, v| {
4416                    *sum += v;
4417                },
4418                monotone = manual_proof!(/** test */)
4419            ),
4420        );
4421
4422        let threshold_out = sum
4423            .threshold_greater_or_equal(node.singleton(q!(7)))
4424            .send_bincode_external(&external);
4425
4426        let nodes = flow
4427            .with_process(&node, deployment.Localhost())
4428            .with_external(&external, deployment.Localhost())
4429            .deploy(&mut deployment);
4430
4431        deployment.deploy().await.unwrap();
4432
4433        let mut threshold_out = nodes.connect(threshold_out).await;
4434
4435        deployment.start().await.unwrap();
4436
4437        assert_eq!(threshold_out.next().await.unwrap(), 7);
4438    }
4439
4440    #[cfg(feature = "deploy")]
4441    #[tokio::test]
4442    async fn monotone_count_threshold() {
4443        let mut deployment = Deployment::new();
4444
4445        let mut flow = FlowBuilder::new();
4446        let node = flow.process::<()>();
4447        let external = flow.external::<()>();
4448
4449        let in_unbounded: super::Stream<_, _> =
4450            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4451        let sum = in_unbounded.count();
4452
4453        let threshold_out = sum
4454            .threshold_greater_or_equal(node.singleton(q!(3)))
4455            .send_bincode_external(&external);
4456
4457        let nodes = flow
4458            .with_process(&node, deployment.Localhost())
4459            .with_external(&external, deployment.Localhost())
4460            .deploy(&mut deployment);
4461
4462        deployment.deploy().await.unwrap();
4463
4464        let mut threshold_out = nodes.connect(threshold_out).await;
4465
4466        deployment.start().await.unwrap();
4467
4468        assert_eq!(threshold_out.next().await.unwrap(), 3);
4469    }
4470
4471    #[cfg(feature = "deploy")]
4472    #[tokio::test]
4473    async fn monotone_map_order_preserving_threshold() {
4474        use crate::properties::manual_proof;
4475
4476        let mut deployment = Deployment::new();
4477
4478        let mut flow = FlowBuilder::new();
4479        let node = flow.process::<()>();
4480        let external = flow.external::<()>();
4481
4482        let in_unbounded: super::Stream<_, _> =
4483            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4484        let sum = in_unbounded.fold(
4485            q!(|| 0),
4486            q!(
4487                |sum, v| {
4488                    *sum += v;
4489                },
4490                monotone = manual_proof!(/** test */)
4491            ),
4492        );
4493
4494        // map with order_preserving should preserve monotonicity
4495        let doubled = sum.map(q!(
4496            |v| v * 2,
4497            order_preserving = manual_proof!(/** doubling preserves order */)
4498        ));
4499
4500        let threshold_out = doubled
4501            .threshold_greater_or_equal(node.singleton(q!(14)))
4502            .send_bincode_external(&external);
4503
4504        let nodes = flow
4505            .with_process(&node, deployment.Localhost())
4506            .with_external(&external, deployment.Localhost())
4507            .deploy(&mut deployment);
4508
4509        deployment.deploy().await.unwrap();
4510
4511        let mut threshold_out = nodes.connect(threshold_out).await;
4512
4513        deployment.start().await.unwrap();
4514
4515        assert_eq!(threshold_out.next().await.unwrap(), 14);
4516    }
4517
4518    // === Compile-time type tests for join/cross_product ordering ===
4519
4520    #[cfg(any(feature = "deploy", feature = "sim"))]
4521    mod join_ordering_type_tests {
4522        use crate::live_collections::boundedness::{Bounded, Unbounded};
4523        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4524        use crate::location::{Location, Process};
4525
4526        #[expect(dead_code, reason = "compile-time type test")]
4527        fn join_unbounded_with_bounded_preserves_order<'a>(
4528            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4529            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4530        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4531            left.join(right)
4532        }
4533
4534        #[expect(dead_code, reason = "compile-time type test")]
4535        fn join_unbounded_with_unbounded_is_no_order<'a>(
4536            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4537            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4538        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4539            left.join(right)
4540        }
4541
4542        #[expect(dead_code, reason = "compile-time type test")]
4543        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4544            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4545            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4546        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4547            left.join(right)
4548        }
4549
4550        #[expect(dead_code, reason = "compile-time type test")]
4551        fn join_unbounded_noorder_with_bounded<'a>(
4552            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4553            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4554        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4555            left.join(right)
4556        }
4557
4558        // === Compile-time type tests for cross_product ordering ===
4559
4560        #[expect(dead_code, reason = "compile-time type test")]
4561        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4562            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4563            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4564        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4565            left.cross_product(right)
4566        }
4567
4568        #[expect(dead_code, reason = "compile-time type test")]
4569        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4570            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4571            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4572        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4573            left.cross_product(right)
4574        }
4575
4576        #[expect(dead_code, reason = "compile-time type test")]
4577        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4578            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4579            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4580        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4581            left.cross_product(right)
4582        }
4583    } // mod join_ordering_type_tests
4584
4585    // === Runtime correctness tests for bounded join/cross_product ===
4586
4587    #[cfg(feature = "sim")]
4588    #[test]
4589    fn cross_product_mixed_boundedness_correctness() {
4590        use stageleft::q;
4591
4592        use crate::compile::builder::FlowBuilder;
4593        use crate::nondet::nondet;
4594
4595        let mut flow = FlowBuilder::new();
4596        let process = flow.process::<()>();
4597        let tick = process.tick();
4598
4599        let left = process.source_iter(q!(vec![1, 2]));
4600        let right = process
4601            .source_iter(q!(vec!['a', 'b']))
4602            .batch(&tick, nondet!(/** test */))
4603            .all_ticks();
4604
4605        let out = left.cross_product(right).sim_output();
4606
4607        flow.sim().exhaustive(async || {
4608            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4609                .await;
4610        });
4611    }
4612
4613    #[cfg(feature = "sim")]
4614    #[test]
4615    fn join_mixed_boundedness_correctness() {
4616        use stageleft::q;
4617
4618        use crate::compile::builder::FlowBuilder;
4619        use crate::nondet::nondet;
4620
4621        let mut flow = FlowBuilder::new();
4622        let process = flow.process::<()>();
4623        let tick = process.tick();
4624
4625        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4626        let right = process
4627            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4628            .batch(&tick, nondet!(/** test */))
4629            .all_ticks();
4630
4631        let out = left.join(right).sim_output();
4632
4633        flow.sim().exhaustive(async || {
4634            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4635                .await;
4636        });
4637    }
4638
4639    #[cfg(feature = "sim")]
4640    #[test]
4641    fn sim_merge_unordered_independent_atomics() {
4642        let mut flow = FlowBuilder::new();
4643        let node = flow.process::<()>();
4644
4645        let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4646        let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4647
4648        let out = input1
4649            .atomic()
4650            .merge_unordered(input2.atomic())
4651            .end_atomic()
4652            .sim_output();
4653
4654        flow.sim().exhaustive(async || {
4655            in1_send.send(1);
4656            in2_send.send(2);
4657
4658            out.assert_yields_only_unordered(vec![1, 2]).await;
4659        });
4660    }
4661
4662    #[cfg(feature = "deploy")]
4663    #[tokio::test]
4664    async fn test_stream_ref() {
4665        let mut deployment = Deployment::new();
4666
4667        let mut flow = FlowBuilder::new();
4668        let external = flow.external::<()>();
4669        let p1 = flow.process::<()>();
4670
4671        // Create a bounded stream (source_iter is bounded within a tick)
4672        let my_stream = p1.source_iter(q!(1..=5i32));
4673
4674        let stream_ref = my_stream.by_ref();
4675
4676        // Use the stream ref to get the vec's length
4677        let out_port = p1
4678            .source_iter(q!([()]))
4679            .map(q!(|_| stream_ref.len() as i32))
4680            .send_bincode_external(&external);
4681
4682        // Also consume the stream via pipe
4683        my_stream.for_each(q!(|_| {}));
4684
4685        let nodes = flow
4686            .with_default_optimize()
4687            .with_process(&p1, deployment.Localhost())
4688            .with_external(&external, deployment.Localhost())
4689            .deploy(&mut deployment);
4690
4691        deployment.deploy().await.unwrap();
4692
4693        let mut out_recv = nodes.connect(out_port).await;
4694
4695        deployment.start().await.unwrap();
4696
4697        let result = out_recv.next().await.unwrap();
4698        // stream has 5 elements
4699        assert_eq!(result, 5);
4700    }
4701
4702    #[cfg(feature = "deploy")]
4703    #[tokio::test]
4704    async fn test_stream_ref_contents() {
4705        let mut deployment = Deployment::new();
4706
4707        let mut flow = FlowBuilder::new();
4708        let external = flow.external::<()>();
4709        let p1 = flow.process::<()>();
4710
4711        // Create a bounded stream
4712        let my_stream = p1.source_iter(q!(1..=3i32));
4713
4714        let stream_ref = my_stream.by_ref();
4715
4716        // Sum the referenced vec's contents
4717        let out_port = p1
4718            .source_iter(q!([()]))
4719            .map(q!(|_| stream_ref.iter().sum::<i32>()))
4720            .send_bincode_external(&external);
4721
4722        my_stream.for_each(q!(|_| {}));
4723
4724        let nodes = flow
4725            .with_default_optimize()
4726            .with_process(&p1, deployment.Localhost())
4727            .with_external(&external, deployment.Localhost())
4728            .deploy(&mut deployment);
4729
4730        deployment.deploy().await.unwrap();
4731
4732        let mut out_recv = nodes.connect(out_port).await;
4733
4734        deployment.start().await.unwrap();
4735
4736        let result = out_recv.next().await.unwrap();
4737        // sum of 1+2+3 = 6
4738        assert_eq!(result, 6);
4739    }
4740
4741    #[cfg(feature = "deploy")]
4742    #[tokio::test]
4743    async fn test_stream_ref_no_consumer() {
4744        let mut deployment = Deployment::new();
4745
4746        let mut flow = FlowBuilder::new();
4747        let external = flow.external::<()>();
4748        let p1 = flow.process::<()>();
4749
4750        // Create a bounded stream — no pipe consumer, only ref
4751        let my_stream = p1.source_iter(q!(1..=4i32));
4752
4753        let stream_ref = my_stream.by_ref();
4754
4755        let out_port = p1
4756            .source_iter(q!([()]))
4757            .map(q!(|_| stream_ref.len() as i32))
4758            .send_bincode_external(&external);
4759
4760        let nodes = flow
4761            .with_default_optimize()
4762            .with_process(&p1, deployment.Localhost())
4763            .with_external(&external, deployment.Localhost())
4764            .deploy(&mut deployment);
4765
4766        deployment.deploy().await.unwrap();
4767
4768        let mut out_recv = nodes.connect(out_port).await;
4769
4770        deployment.start().await.unwrap();
4771
4772        let result = out_recv.next().await.unwrap();
4773        assert_eq!(result, 4);
4774    }
4775
4776    #[cfg(feature = "deploy")]
4777    #[tokio::test]
4778    async fn test_stream_mut() {
4779        let mut deployment = Deployment::new();
4780
4781        let mut flow = FlowBuilder::new();
4782        let external = flow.external::<()>();
4783        let p1 = flow.process::<()>();
4784
4785        // Create a bounded stream
4786        let my_stream = p1.source_iter(q!(1..=5i32));
4787
4788        let stream_mut = my_stream.by_mut();
4789
4790        // Mutably reference the buffer to retain only items > 3
4791        let out_port = p1
4792            .source_iter(q!([()]))
4793            .map(q!(|_| {
4794                stream_mut.retain(|x| *x > 3);
4795                stream_mut.len() as i32
4796            }))
4797            .send_bincode_external(&external);
4798
4799        my_stream.for_each(q!(|_| {}));
4800
4801        let nodes = flow
4802            .with_default_optimize()
4803            .with_process(&p1, deployment.Localhost())
4804            .with_external(&external, deployment.Localhost())
4805            .deploy(&mut deployment);
4806
4807        deployment.deploy().await.unwrap();
4808
4809        let mut out_recv = nodes.connect(out_port).await;
4810
4811        deployment.start().await.unwrap();
4812
4813        let result = out_recv.next().await.unwrap();
4814        // After retain(> 3): [4, 5] => len = 2
4815        assert_eq!(result, 2);
4816    }
4817
4818    /// A map with a mut singleton ref on an unordered input should produce > 1
4819    /// simulation instance because the ordering of elements through the mut closure
4820    /// is non-deterministic.
4821    #[cfg(feature = "sim")]
4822    #[test]
4823    fn sim_map_with_mut_on_unordered_explores_multiple_states() {
4824        use crate::live_collections::sliced::sliced;
4825        use crate::live_collections::stream::ExactlyOnce;
4826        use crate::properties::manual_proof;
4827
4828        let mut flow = FlowBuilder::new();
4829        let node = flow.process::<()>();
4830
4831        let (trigger_send, trigger) = node.sim_input::<i32, TotalOrder, ExactlyOnce>();
4832
4833        let out_recv = sliced! {
4834            let batch = use(trigger, nondet!(/** test */));
4835            let counter = batch.location().source_iter(q!(vec![0i32]))
4836                .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4837            let counter_mut = counter.by_mut();
4838            let items = batch.location().source_iter(q!(vec![1i32, 2])).weaken_ordering::<NoOrder>();
4839            items.map(q!(
4840                |x| {
4841                    *counter_mut += x;
4842                    *counter_mut
4843                },
4844                commutative = manual_proof!(/** test */)
4845            ))
4846        }
4847        .sim_output();
4848
4849        let count = flow.sim().exhaustive(async || {
4850            trigger_send.send(1);
4851            let _all: Vec<i32> = out_recv.collect_sorted().await;
4852        });
4853
4854        assert_eq!(
4855            count, 2,
4856            "Expected 2 simulation instances due to mut on unordered input, got {}",
4857            count
4858        );
4859    }
4860
4861    /// A map with a mut singleton ref on a top-level unordered input should produce > 1
4862    /// simulation instance. Currently panics because observe_nondet doesn't support
4863    /// top-level bounded inputs yet.
4864    #[cfg(feature = "sim")]
4865    #[test]
4866    #[ignore = "observe_nondet not yet supported for top-level bounded inputs (https://github.com/hydro-project/hydro/issues/2950)"]
4867    fn sim_map_with_mut_on_unordered_top_level() {
4868        use crate::properties::manual_proof;
4869
4870        let mut flow = FlowBuilder::new();
4871        let node = flow.process::<()>();
4872
4873        let counter = node
4874            .source_iter(q!(vec![0i32]))
4875            .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4876        let counter_mut = counter.by_mut();
4877
4878        let out_recv = node
4879            .source_iter(q!(vec![1i32, 2]))
4880            .weaken_ordering::<NoOrder>()
4881            .map(q!(
4882                |x| {
4883                    *counter_mut += x;
4884                    *counter_mut
4885                },
4886                commutative = manual_proof!(/** test */)
4887            ))
4888            .assume_ordering::<TotalOrder>(nondet!(/** test */))
4889            .sim_output();
4890
4891        counter.into_stream().for_each(q!(|_| {}));
4892
4893        let count = flow.sim().exhaustive(async || {
4894            let _all: Vec<i32> = out_recv.collect().await;
4895        });
4896
4897        assert_eq!(
4898            count, 2,
4899            "Expected 2 simulation instances due to mut on unordered input, got {}",
4900            count
4901        );
4902    }
4903}