1use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
36 ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
37 ValidMutCommutativityFor, ValidMutIdempotenceFor,
38};
39
40pub mod networking;
41
42#[sealed::sealed]
44pub trait Ordering:
45 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
46{
47 const ORDERING_KIND: StreamOrder;
49}
50
51pub enum TotalOrder {}
55
56#[sealed::sealed]
57impl Ordering for TotalOrder {
58 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
59}
60
61pub enum NoOrder {}
67
68#[sealed::sealed]
69impl Ordering for NoOrder {
70 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
71}
72
73#[sealed::sealed]
77pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
78#[sealed::sealed]
79impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
80
81#[sealed::sealed]
83pub trait MinOrder<Other: ?Sized> {
84 type Min: Ordering;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for TotalOrder {
90 type Min = O;
91}
92
93#[sealed::sealed]
94impl<O: Ordering> MinOrder<O> for NoOrder {
95 type Min = NoOrder;
96}
97
98#[sealed::sealed]
100pub trait Retries:
101 MinRetries<Self, Min = Self>
102 + MinRetries<ExactlyOnce, Min = Self>
103 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
104{
105 const RETRIES_KIND: StreamRetry;
107}
108
109pub enum ExactlyOnce {}
112
113#[sealed::sealed]
114impl Retries for ExactlyOnce {
115 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
116}
117
118pub enum AtLeastOnce {}
121
122#[sealed::sealed]
123impl Retries for AtLeastOnce {
124 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
125}
126
127#[sealed::sealed]
131pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
132#[sealed::sealed]
133impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
134
135#[sealed::sealed]
137pub trait MinRetries<Other: ?Sized> {
138 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for ExactlyOnce {
144 type Min = R;
145}
146
147#[sealed::sealed]
148impl<R: Retries> MinRetries<R> for AtLeastOnce {
149 type Min = AtLeastOnce;
150}
151
152#[sealed::sealed]
153#[diagnostic::on_unimplemented(
154 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
155 label = "required here",
156 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
157)]
158pub trait IsOrdered: Ordering {}
160
161#[sealed::sealed]
162#[diagnostic::do_not_recommend]
163impl IsOrdered for TotalOrder {}
164
165#[sealed::sealed]
166#[diagnostic::on_unimplemented(
167 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
168 label = "required here",
169 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
170)]
171pub trait IsExactlyOnce: Retries {}
173
174#[sealed::sealed]
175#[diagnostic::do_not_recommend]
176impl IsExactlyOnce for ExactlyOnce {}
177
178pub struct Stream<
198 Type,
199 Loc,
200 Bound: Boundedness = Unbounded,
201 Order: Ordering = TotalOrder,
202 Retry: Retries = ExactlyOnce,
203> {
204 pub(crate) location: Loc,
205 pub(crate) ir_node: RefCell<HydroNode>,
206 pub(crate) flow_state: FlowState,
207
208 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
209}
210
211impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
212 fn drop(&mut self) {
213 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
214 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
215 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
216 input: Box::new(ir_node),
217 op_metadata: HydroIrOpMetadata::new(),
218 });
219 }
220 }
221}
222
223impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
224 for Stream<T, L, Unbounded, O, R>
225where
226 L: Location<'a>,
227{
228 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
229 let new_meta = stream
230 .location
231 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
232
233 Stream {
234 location: stream.location.clone(),
235 flow_state: stream.flow_state.clone(),
236 ir_node: RefCell::new(HydroNode::Cast {
237 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
238 metadata: new_meta,
239 }),
240 _phantom: PhantomData,
241 }
242 }
243}
244
245impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
246 for Stream<T, L, B, NoOrder, R>
247where
248 L: Location<'a>,
249{
250 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
251 stream.weaken_ordering()
252 }
253}
254
255impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
256 for Stream<T, L, B, O, AtLeastOnce>
257where
258 L: Location<'a>,
259{
260 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
261 stream.weaken_retries()
262 }
263}
264
265impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
266where
267 L: Location<'a>,
268{
269 fn defer_tick(self) -> Self {
270 Stream::defer_tick(self)
271 }
272}
273
274impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
275 for Stream<T, Tick<L>, Bounded, O, R>
276where
277 L: Location<'a>,
278{
279 type Location = Tick<L>;
280
281 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
282 Stream::new(
283 location.clone(),
284 HydroNode::CycleSource {
285 cycle_id,
286 metadata: location.new_node_metadata(Self::collection_kind()),
287 },
288 )
289 }
290}
291
292impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
293 for Stream<T, Tick<L>, Bounded, O, R>
294where
295 L: Location<'a>,
296{
297 type Location = Tick<L>;
298
299 fn location(&self) -> &Self::Location {
300 self.location()
301 }
302
303 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
304 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
305 location.clone(),
306 HydroNode::DeferTick {
307 input: Box::new(HydroNode::CycleSource {
308 cycle_id,
309 metadata: location.new_node_metadata(Self::collection_kind()),
310 }),
311 metadata: location.new_node_metadata(Self::collection_kind()),
312 },
313 );
314
315 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
316 }
317}
318
319impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
320 for Stream<T, Tick<L>, Bounded, O, R>
321where
322 L: Location<'a>,
323{
324 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
325 assert_eq!(
326 Location::id(&self.location),
327 expected_location,
328 "locations do not match"
329 );
330 self.location
331 .flow_state()
332 .borrow_mut()
333 .push_root(HydroRoot::CycleSink {
334 cycle_id,
335 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
336 op_metadata: HydroIrOpMetadata::new(),
337 });
338 }
339}
340
341impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
342 for Stream<T, L, B, O, R>
343where
344 L: Location<'a>,
345{
346 type Location = L;
347
348 fn create_source(cycle_id: CycleId, location: L) -> Self {
349 Stream::new(
350 location.clone(),
351 HydroNode::CycleSource {
352 cycle_id,
353 metadata: location.new_node_metadata(Self::collection_kind()),
354 },
355 )
356 }
357}
358
359impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
360 for Stream<T, L, B, O, R>
361where
362 L: Location<'a>,
363{
364 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
365 assert_eq!(
366 Location::id(&self.location),
367 expected_location,
368 "locations do not match"
369 );
370 self.location
371 .flow_state()
372 .borrow_mut()
373 .push_root(HydroRoot::CycleSink {
374 cycle_id,
375 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
376 op_metadata: HydroIrOpMetadata::new(),
377 });
378 }
379}
380
381impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
382where
383 T: Clone,
384 L: Location<'a>,
385{
386 fn clone(&self) -> Self {
387 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
388 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
389 *self.ir_node.borrow_mut() = HydroNode::Tee {
390 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
391 metadata: self.location.new_node_metadata(Self::collection_kind()),
392 };
393 }
394
395 let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
396 unreachable!()
397 };
398 Stream {
399 location: self.location.clone(),
400 flow_state: self.flow_state.clone(),
401 ir_node: HydroNode::Tee {
402 inner: SharedNode(inner.0.clone()),
403 metadata: metadata.clone(),
404 }
405 .into(),
406 _phantom: PhantomData,
407 }
408 }
409}
410
411impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
412where
413 L: Location<'a>,
414{
415 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
416 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
417 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
418
419 let flow_state = location.flow_state().clone();
420 Stream {
421 location,
422 flow_state,
423 ir_node: RefCell::new(ir_node),
424 _phantom: PhantomData,
425 }
426 }
427
428 pub fn location(&self) -> &L {
430 &self.location
431 }
432
433 pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
438 where
439 B: IsBounded,
440 {
441 crate::handoff_ref::StreamRef::new(&self.ir_node)
442 }
443
444 pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
447 where
448 B: IsBounded,
449 {
450 crate::handoff_ref::StreamMut::new(&self.ir_node)
451 }
452
453 pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
456 where
457 L: Location<'a>,
458 {
459 if L::consistency()
460 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
461 {
462 Stream::new(
464 self.location.drop_consistency(),
465 self.ir_node.replace(HydroNode::Placeholder),
466 )
467 } else {
468 Stream::new(
469 self.location.drop_consistency(),
470 HydroNode::Cast {
471 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
472 metadata: self.location.drop_consistency().new_node_metadata(Stream::<
473 T,
474 L::DropConsistency,
475 B,
476 O,
477 R,
478 >::collection_kind(
479 )),
480 },
481 )
482 }
483 }
484
485 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
489 self,
490 _proof: impl crate::properties::ConsistencyProof,
491 ) -> Stream<T, L2, B, O, R>
492 where
493 L: Location<'a>,
494 {
495 if L::consistency() == L2::consistency() {
496 Stream::new(
497 self.location.with_consistency_of(),
498 self.ir_node.replace(HydroNode::Placeholder),
499 )
500 } else {
501 Stream::new(
502 self.location.with_consistency_of(),
503 HydroNode::AssertIsConsistent {
504 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505 trusted: false,
506 metadata: self
507 .location
508 .clone()
509 .with_consistency_of::<L2>()
510 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
511 },
512 )
513 }
514 }
515
516 pub(crate) fn assert_has_consistency_of_trusted<
517 L2: Location<'a, DropConsistency = L::DropConsistency>,
518 >(
519 self,
520 _proof: impl crate::properties::ConsistencyProof,
521 ) -> Stream<T, L2, B, O, R>
522 where
523 L: Location<'a>,
524 {
525 if L::consistency() == L2::consistency() {
526 Stream::new(
527 self.location.with_consistency_of(),
528 self.ir_node.replace(HydroNode::Placeholder),
529 )
530 } else {
531 Stream::new(
532 self.location.with_consistency_of(),
533 HydroNode::AssertIsConsistent {
534 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
535 trusted: true,
536 metadata: self
537 .location
538 .clone()
539 .with_consistency_of::<L2>()
540 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
541 },
542 )
543 }
544 }
545
546 pub(crate) fn collection_kind() -> CollectionKind {
547 CollectionKind::Stream {
548 bound: B::BOUND_KIND,
549 order: O::ORDERING_KIND,
550 retry: R::RETRIES_KIND,
551 element_type: quote_type::<T>().into(),
552 }
553 }
554
555 pub fn map<U, F, C, I, const WAS_MUT: bool>(
575 self,
576 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
577 ) -> Stream<U, L, B, O, R>
578 where
579 F: FnMut(T) -> U + 'a,
580 C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
581 I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
582 {
583 let f = crate::handoff_ref::with_ref_capture(|| {
584 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
585 proof.register_proof(&expr);
586 expr.into()
587 });
588 Stream::new(
589 self.location.clone(),
590 HydroNode::Map {
591 f,
592 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
593 metadata: self
594 .location
595 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
596 },
597 )
598 }
599
600 pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
625 self,
626 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
627 ) -> Stream<U, L, B, O, R>
628 where
629 I: IntoIterator<Item = U>,
630 F: FnMut(T) -> I + 'a,
631 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
632 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
633 {
634 let f = crate::handoff_ref::with_ref_capture(|| {
635 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
636 proof.register_proof(&expr);
637 expr.into()
638 });
639 Stream::new(
640 self.location.clone(),
641 HydroNode::FlatMap {
642 f,
643 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
644 metadata: self
645 .location
646 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
647 },
648 )
649 }
650
651 pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
678 self,
679 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
680 ) -> Stream<U, L, B, NoOrder, R>
681 where
682 I: IntoIterator<Item = U>,
683 F: FnMut(T) -> I + 'a,
684 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
685 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
686 {
687 let f = crate::handoff_ref::with_ref_capture(|| {
688 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
689 proof.register_proof(&expr);
690 expr.into()
691 });
692 Stream::new(
693 self.location.clone(),
694 HydroNode::FlatMap {
695 f,
696 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
697 metadata: self
698 .location
699 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
700 },
701 )
702 }
703
704 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
727 where
728 T: IntoIterator<Item = U>,
729 {
730 self.flat_map_ordered(q!(|d| d))
731 }
732
733 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
760 where
761 T: IntoIterator<Item = U>,
762 {
763 self.flat_map_unordered(q!(|d| d))
764 }
765
766 pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
770 self,
771 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
772 ) -> Stream<U, L, B, O, R>
773 where
774 S: futures::Stream<Item = U>,
775 F: FnMut(T) -> S + 'a,
776 C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
777 Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
778 {
779 let f = crate::handoff_ref::with_ref_capture(|| {
780 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
781 proof.register_proof(&expr);
782 expr.into()
783 });
784 Stream::new(
785 self.location.clone(),
786 HydroNode::FlatMapStreamBlocking {
787 f,
788 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
789 metadata: self
790 .location
791 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
792 },
793 )
794 }
795
796 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
800 where
801 T: futures::Stream<Item = U>,
802 {
803 self.flat_map_stream_blocking(q!(|d| d))
804 }
805
806 pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
831 self,
832 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
833 ) -> Self
834 where
835 F: FnMut(&T) -> bool + 'a,
836 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
837 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
838 {
839 let f = crate::handoff_ref::with_ref_capture(|| {
840 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
841 proof.register_proof(&expr);
842 expr.into()
843 });
844 Stream::new(
845 self.location.clone(),
846 HydroNode::Filter {
847 f,
848 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
849 metadata: self.location.new_node_metadata(Self::collection_kind()),
850 },
851 )
852 }
853
854 pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
889 self,
890 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
891 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
892 where
893 F: FnMut(&T) -> bool + 'a,
894 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
895 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
896 {
897 let f = crate::handoff_ref::with_ref_capture(|| {
898 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
899 proof.register_proof(&expr);
900 expr.into()
901 });
902 let shared = SharedNode(Rc::new(RefCell::new(
903 self.ir_node.replace(HydroNode::Placeholder),
904 )));
905
906 let true_stream = Stream::new(
907 self.location.clone(),
908 HydroNode::Partition {
909 inner: SharedNode(shared.0.clone()),
910 f: f.clone(),
911 is_true: true,
912 metadata: self.location.new_node_metadata(Self::collection_kind()),
913 },
914 );
915
916 let false_stream = Stream::new(
917 self.location.clone(),
918 HydroNode::Partition {
919 inner: SharedNode(shared.0),
920 f,
921 is_true: false,
922 metadata: self.location.new_node_metadata(Self::collection_kind()),
923 },
924 );
925
926 (true_stream, false_stream)
927 }
928
929 pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
949 self,
950 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
951 ) -> Stream<U, L, B, O, R>
952 where
953 F: FnMut(T) -> Option<U> + 'a,
954 C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
955 Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
956 {
957 let f = crate::handoff_ref::with_ref_capture(|| {
958 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
959 proof.register_proof(&expr);
960 expr.into()
961 });
962 Stream::new(
963 self.location.clone(),
964 HydroNode::FilterMap {
965 f,
966 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
967 metadata: self
968 .location
969 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
970 },
971 )
972 }
973
974 pub fn cross_singleton<O2>(
999 self,
1000 other: impl Into<Optional<O2, L, Bounded>>,
1001 ) -> Stream<(T, O2), L, B, O, R>
1002 where
1003 O2: Clone,
1004 {
1005 let other: Optional<O2, L, Bounded> = other.into();
1006 check_matching_location(&self.location, &other.location);
1007
1008 Stream::new(
1009 self.location.clone(),
1010 HydroNode::CrossSingleton {
1011 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1012 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1013 metadata: self
1014 .location
1015 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1016 },
1017 )
1018 }
1019
1020 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1052 self.cross_singleton(signal.filter(q!(|b| *b)))
1053 .map(q!(|(d, _)| d))
1054 }
1055
1056 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1091 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1092 self.filter_if(signal.is_some())
1093 }
1094
1095 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1130 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1131 self.filter_if(other.is_none())
1132 }
1133
1134 pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1159 self,
1160 other: Stream<T2, L, B2, O2, R2>,
1161 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1162 where
1163 T: Clone,
1164 T2: Clone,
1165 R: MinRetries<R2>,
1166 {
1167 self.map(q!(|v| ((), v)))
1168 .join(other.map(q!(|v| ((), v))))
1169 .map(q!(|((), (v1, v2))| (v1, v2)))
1170 }
1171
1172 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1191 where
1192 T: Eq + Hash,
1193 {
1194 Stream::new(
1195 self.location.clone(),
1196 HydroNode::Unique {
1197 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1198 metadata: self
1199 .location
1200 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1201 },
1202 )
1203 }
1204
1205 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1231 where
1232 T: Eq + Hash,
1233 B2: IsBounded,
1234 {
1235 check_matching_location(&self.location, &other.location);
1236
1237 Stream::new(
1238 self.location.clone(),
1239 HydroNode::Difference {
1240 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1241 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1242 metadata: self
1243 .location
1244 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1245 },
1246 )
1247 }
1248
1249 pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1270 self,
1271 f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1272 ) -> Self
1273 where
1274 F: FnMut(&T) + 'a,
1275 C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1276 Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1277 {
1278 let f = crate::handoff_ref::with_ref_capture(|| {
1279 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1280 proof.register_proof(&expr);
1281 expr.into()
1282 });
1283
1284 Stream::new(
1285 self.location.clone(),
1286 HydroNode::Inspect {
1287 f,
1288 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1289 metadata: self.location.new_node_metadata(Self::collection_kind()),
1290 },
1291 )
1292 }
1293
1294 pub fn for_each<F: FnMut(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1305 where
1306 O: IsOrdered,
1307 R: IsExactlyOnce,
1308 {
1309 let f = crate::handoff_ref::with_ref_capture(|| f.splice_fnmut1_ctx(&self.location).into());
1310 self.location
1311 .flow_state()
1312 .borrow_mut()
1313 .push_root(HydroRoot::ForEach {
1314 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1315 f,
1316 op_metadata: HydroIrOpMetadata::new(),
1317 });
1318 }
1319
1320 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1326 where
1327 O: IsOrdered,
1328 R: IsExactlyOnce,
1329 S: 'a + futures::Sink<T> + Unpin,
1330 {
1331 self.location
1332 .flow_state()
1333 .borrow_mut()
1334 .push_root(HydroRoot::DestSink {
1335 sink: sink.splice_typed_ctx(&self.location).into(),
1336 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1337 op_metadata: HydroIrOpMetadata::new(),
1338 });
1339 }
1340
1341 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1361 where
1362 O: IsOrdered,
1363 R: IsExactlyOnce,
1364 {
1365 Stream::new(
1366 self.location.clone(),
1367 HydroNode::Enumerate {
1368 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1369 metadata: self.location.new_node_metadata(Stream::<
1370 (usize, T),
1371 L,
1372 B,
1373 TotalOrder,
1374 ExactlyOnce,
1375 >::collection_kind()),
1376 },
1377 )
1378 }
1379
1380 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1404 self,
1405 init: impl IntoQuotedMut<'a, I, L>,
1406 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1407 ) -> Singleton<A, L, B2>
1408 where
1409 I: Fn() -> A + 'a,
1410 F: 'a + Fn(&mut A, T),
1411 C: ValidCommutativityFor<O>,
1412 Idemp: ValidIdempotenceFor<R>,
1413 B: ApplyMonotoneStream<M, B2>,
1414 {
1415 let init = init.splice_fn0_ctx(&self.location).into();
1416 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1417 proof.register_proof(&comb);
1418
1419 let nondet = nondet!();
1422 let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1423
1424 let core = HydroNode::Fold {
1425 init,
1426 acc: comb.into(),
1427 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1428 metadata: retried
1429 .location
1430 .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1431 };
1436
1437 Singleton::new(retried.location.clone(), core)
1438 .assert_has_consistency_of(manual_proof!())
1439 }
1440
1441 pub fn reduce<F, C, Idemp>(
1464 self,
1465 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1466 ) -> Optional<T, L, B>
1467 where
1468 F: Fn(&mut T, T) + 'a,
1469 C: ValidCommutativityFor<O>,
1470 Idemp: ValidIdempotenceFor<R>,
1471 {
1472 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1473 proof.register_proof(&f);
1474
1475 let nondet = nondet!();
1476 let ordered_etc: Stream<T, L::DropConsistency, B> =
1477 self.assume_retries(nondet).assume_ordering(nondet);
1478
1479 let core = HydroNode::Reduce {
1480 f: f.into(),
1481 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1482 metadata: ordered_etc
1483 .location
1484 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1485 };
1486
1487 Optional::new(ordered_etc.location.clone(), core)
1488 .assert_has_consistency_of(manual_proof!())
1489 }
1490
1491 pub fn max(self) -> Optional<T, L, B>
1511 where
1512 T: Ord,
1513 {
1514 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1515 .assume_ordering_trusted_bounded::<TotalOrder>(
1516 nondet!(),
1517 )
1518 .reduce(q!(|curr, new| {
1519 if new > *curr {
1520 *curr = new;
1521 }
1522 }))
1523 }
1524
1525 pub fn min(self) -> Optional<T, L, B>
1545 where
1546 T: Ord,
1547 {
1548 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1549 .assume_ordering_trusted_bounded::<TotalOrder>(
1550 nondet!(),
1551 )
1552 .reduce(q!(|curr, new| {
1553 if new < *curr {
1554 *curr = new;
1555 }
1556 }))
1557 }
1558
1559 pub fn first(self) -> Optional<T, L, B>
1582 where
1583 O: IsOrdered,
1584 {
1585 self.make_totally_ordered()
1586 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1587 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1588 .reduce(q!(|_, _| {}))
1589 }
1590
1591 pub fn last(self) -> Optional<T, L, B>
1614 where
1615 O: IsOrdered,
1616 {
1617 self.make_totally_ordered()
1618 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1619 .reduce(q!(|curr, new| *curr = new))
1620 }
1621
1622 pub fn limit(
1645 self,
1646 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1647 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1648 where
1649 O: IsOrdered,
1650 R: IsExactlyOnce,
1651 {
1652 self.generator(
1653 q!(|| 0usize),
1654 q!(move |count, item| {
1655 if *count == n {
1656 Generate::Break
1657 } else {
1658 *count += 1;
1659 if *count == n {
1660 Generate::Return(item)
1661 } else {
1662 Generate::Yield(item)
1663 }
1664 }
1665 }),
1666 )
1667 }
1668
1669 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1695 where
1696 O: IsOrdered,
1697 R: IsExactlyOnce,
1698 {
1699 self.make_totally_ordered().make_exactly_once().fold(
1700 q!(|| vec![]),
1701 q!(|acc, v| {
1702 acc.push(v);
1703 }),
1704 )
1705 }
1706
1707 pub fn scan<A, U, I, F>(
1768 self,
1769 init: impl IntoQuotedMut<'a, I, L>,
1770 f: impl IntoQuotedMut<'a, F, L>,
1771 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1772 where
1773 O: IsOrdered,
1774 R: IsExactlyOnce,
1775 I: Fn() -> A + 'a,
1776 F: Fn(&mut A, T) -> Option<U> + 'a,
1777 {
1778 let init = init.splice_fn0_ctx(&self.location).into();
1779 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1780
1781 Stream::new(
1782 self.location.clone(),
1783 HydroNode::Scan {
1784 init,
1785 acc: f,
1786 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1787 metadata: self.location.new_node_metadata(
1788 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1789 ),
1790 },
1791 )
1792 }
1793
1794 pub fn scan_async_blocking<A, U, I, F, Fut>(
1828 self,
1829 init: impl IntoQuotedMut<'a, I, L>,
1830 f: impl IntoQuotedMut<'a, F, L>,
1831 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1832 where
1833 O: IsOrdered,
1834 R: IsExactlyOnce,
1835 I: Fn() -> A + 'a,
1836 F: Fn(&mut A, T) -> Fut + 'a,
1837 Fut: Future<Output = Option<U>> + 'a,
1838 {
1839 let init = init.splice_fn0_ctx(&self.location).into();
1840 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1841
1842 Stream::new(
1843 self.location.clone(),
1844 HydroNode::ScanAsyncBlocking {
1845 init,
1846 acc: f,
1847 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1848 metadata: self.location.new_node_metadata(
1849 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1850 ),
1851 },
1852 )
1853 }
1854
1855 pub fn generator<A, U, I, F>(
1895 self,
1896 init: impl IntoQuotedMut<'a, I, L> + Copy,
1897 f: impl IntoQuotedMut<'a, F, L> + Copy,
1898 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1899 where
1900 O: IsOrdered,
1901 R: IsExactlyOnce,
1902 I: Fn() -> A + 'a,
1903 F: Fn(&mut A, T) -> Generate<U> + 'a,
1904 {
1905 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1906 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1907
1908 let this = self.make_totally_ordered().make_exactly_once();
1909
1910 let scan_init = q!(|| None)
1915 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1916 .into();
1917 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1918 if state.is_none() {
1919 *state = Some(Some(init()));
1920 }
1921 match state {
1922 Some(Some(state_value)) => match f(state_value, v) {
1923 Generate::Yield(out) => Some(Some(out)),
1924 Generate::Return(out) => {
1925 *state = Some(None);
1926 Some(Some(out))
1927 }
1928 Generate::Break => None,
1932 Generate::Continue => Some(None),
1933 },
1934 _ => None,
1936 }
1937 })
1938 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1939 .into();
1940
1941 let scan_node = HydroNode::Scan {
1942 init: scan_init,
1943 acc: scan_f,
1944 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1945 metadata: this.location.new_node_metadata(Stream::<
1946 Option<U>,
1947 L,
1948 B,
1949 TotalOrder,
1950 ExactlyOnce,
1951 >::collection_kind()),
1952 };
1953
1954 let flatten_f = q!(|d| d)
1955 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1956 .into();
1957 let flatten_node = HydroNode::FlatMap {
1958 f: flatten_f,
1959 input: Box::new(scan_node),
1960 metadata: this
1961 .location
1962 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1963 };
1964
1965 Stream::new(this.location.clone(), flatten_node)
1966 }
1967
1968 pub fn sample_every(
1977 self,
1978 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1979 nondet: NonDet,
1980 ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1981 where
1982 L: TopLevel<'a>,
1983 {
1984 let samples = self.location.source_interval(interval);
1985
1986 let tick = self.location.tick();
1987 self.batch(&tick, nondet)
1988 .filter_if(samples.batch(&tick, nondet).first().is_some())
1989 .all_ticks()
1990 .weaken_retries()
1991 }
1992
1993 pub fn timeout(
2003 self,
2004 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2005 nondet: NonDet,
2006 ) -> Optional<(), L::DropConsistency, Unbounded>
2007 where
2008 L: TopLevel<'a>,
2009 {
2010 let tick = self.location.tick();
2011
2012 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2013 q!(|| None),
2014 q!(
2015 |latest, _| {
2016 *latest = Some(Instant::now());
2017 },
2018 commutative = manual_proof!()
2019 ),
2020 );
2021
2022 latest_received
2023 .snapshot(&tick, nondet)
2024 .filter_map(q!(move |latest_received| {
2025 if let Some(latest_received) = latest_received {
2026 if Instant::now().duration_since(latest_received) > duration {
2027 Some(())
2028 } else {
2029 None
2030 }
2031 } else {
2032 Some(())
2033 }
2034 }))
2035 .latest()
2036 }
2037
2038 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2044 let id = self.location.flow_state().borrow_mut().next_clock_id();
2045 let out_location = Atomic {
2046 tick: Tick {
2047 id,
2048 l: self.location.clone(),
2049 },
2050 };
2051 Stream::new(
2052 out_location.clone(),
2053 HydroNode::BeginAtomic {
2054 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2055 metadata: out_location
2056 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2057 },
2058 )
2059 }
2060
2061 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2069 self,
2070 tick: &Tick<L2>,
2071 _nondet: NonDet,
2072 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2073 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2074 Stream::new(
2075 tick.drop_consistency(),
2076 HydroNode::Batch {
2077 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2078 metadata: tick
2079 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2080 },
2081 )
2082 }
2083
2084 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2087 {
2088 let mut node = self.ir_node.borrow_mut();
2089 let metadata = node.metadata_mut();
2090 metadata.tag = Some(name.to_owned());
2091 }
2092 self
2093 }
2094
2095 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2099 where
2100 B: IsBounded,
2101 {
2102 Optional::new(
2103 self.location.clone(),
2104 HydroNode::Cast {
2105 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2106 metadata: self
2107 .location
2108 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2109 },
2110 )
2111 }
2112
2113 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2114 if O::ORDERING_KIND == O2::ORDERING_KIND {
2115 Stream::new(
2116 self.location.clone(),
2117 self.ir_node.replace(HydroNode::Placeholder),
2118 )
2119 } else {
2120 panic!(
2121 "Runtime ordering {:?} did not match requested cast {:?}.",
2122 O::ORDERING_KIND,
2123 O2::ORDERING_KIND
2124 )
2125 }
2126 }
2127
2128 pub fn assume_ordering<O2: Ordering>(
2137 self,
2138 _nondet: NonDet,
2139 ) -> Stream<T, L::DropConsistency, B, O2, R> {
2140 if O::ORDERING_KIND == O2::ORDERING_KIND {
2141 self.use_ordering_type().weaken_consistency()
2142 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2143 let target_location = self.location().drop_consistency();
2145 Stream::new(
2146 target_location.clone(),
2147 HydroNode::Cast {
2148 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2149 metadata: target_location
2150 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2151 },
2152 )
2153 } else {
2154 let target_location = self.location().drop_consistency();
2155 Stream::new(
2156 target_location.clone(),
2157 HydroNode::ObserveNonDet {
2158 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2159 trusted: false,
2160 metadata: target_location
2161 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2162 },
2163 )
2164 }
2165 }
2166
2167 fn assume_ordering_trusted_bounded<O2: Ordering>(
2170 self,
2171 nondet: NonDet,
2172 ) -> Stream<T, L, B, O2, R> {
2173 if B::BOUNDED {
2174 self.assume_ordering_trusted(nondet)
2175 } else {
2176 let self_location = self.location.clone();
2177 let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2178 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2179 }
2180 }
2181
2182 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2185 self,
2186 _nondet: NonDet,
2187 ) -> Stream<T, L, B, O2, R> {
2188 if O::ORDERING_KIND == O2::ORDERING_KIND {
2189 self.use_ordering_type()
2190 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2191 Stream::new(
2193 self.location.clone(),
2194 HydroNode::Cast {
2195 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2196 metadata: self
2197 .location
2198 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2199 },
2200 )
2201 } else {
2202 Stream::new(
2203 self.location.clone(),
2204 HydroNode::ObserveNonDet {
2205 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2206 trusted: true,
2207 metadata: self
2208 .location
2209 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2210 },
2211 )
2212 }
2213 }
2214
2215 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2216 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2219 self.weaken_ordering::<NoOrder>()
2220 }
2221
2222 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2225 let nondet = nondet!();
2226 self.assume_ordering_trusted::<O2>(nondet)
2227 }
2228
2229 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2232 where
2233 O: IsOrdered,
2234 {
2235 self.assume_ordering_trusted(nondet!())
2236 }
2237
2238 pub fn assume_retries<R2: Retries>(
2247 self,
2248 _nondet: NonDet,
2249 ) -> Stream<T, L::DropConsistency, B, O, R2> {
2250 if R::RETRIES_KIND == R2::RETRIES_KIND {
2251 Stream::new(
2252 self.location.drop_consistency(),
2253 self.ir_node.replace(HydroNode::Placeholder),
2254 )
2255 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2256 let target_location = self.location.drop_consistency();
2258 Stream::new(
2259 target_location.clone(),
2260 HydroNode::Cast {
2261 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2262 metadata: target_location
2263 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2264 },
2265 )
2266 } else {
2267 let target_location = self.location.drop_consistency();
2268 Stream::new(
2269 target_location.clone(),
2270 HydroNode::ObserveNonDet {
2271 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2272 trusted: false,
2273 metadata: target_location
2274 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2275 },
2276 )
2277 }
2278 }
2279
2280 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2283 if R::RETRIES_KIND == R2::RETRIES_KIND {
2284 Stream::new(
2285 self.location.clone(),
2286 self.ir_node.replace(HydroNode::Placeholder),
2287 )
2288 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2289 Stream::new(
2291 self.location.clone(),
2292 HydroNode::Cast {
2293 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2294 metadata: self
2295 .location
2296 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2297 },
2298 )
2299 } else {
2300 Stream::new(
2301 self.location.clone(),
2302 HydroNode::ObserveNonDet {
2303 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2304 trusted: true,
2305 metadata: self
2306 .location
2307 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2308 },
2309 )
2310 }
2311 }
2312
2313 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2314 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2317 self.weaken_retries::<AtLeastOnce>()
2318 }
2319
2320 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2323 let nondet = nondet!();
2324 self.assume_retries_trusted::<R2>(nondet)
2325 }
2326
2327 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2330 where
2331 R: IsExactlyOnce,
2332 {
2333 self.assume_retries_trusted(nondet!())
2334 }
2335
2336 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2339 where
2340 B: IsBounded,
2341 {
2342 self.weaken_boundedness()
2343 }
2344
2345 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2348 if B::BOUNDED == B2::BOUNDED {
2349 Stream::new(
2350 self.location.clone(),
2351 self.ir_node.replace(HydroNode::Placeholder),
2352 )
2353 } else {
2354 Stream::new(
2356 self.location.clone(),
2357 HydroNode::Cast {
2358 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2359 metadata: self
2360 .location
2361 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2362 },
2363 )
2364 }
2365 }
2366}
2367
2368impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2369where
2370 L: Location<'a>,
2371{
2372 pub fn cloned(self) -> Stream<T, L, B, O, R>
2390 where
2391 T: Clone,
2392 {
2393 self.map(q!(|d| d.clone()))
2394 }
2395}
2396
2397impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2398where
2399 L: Location<'a>,
2400{
2401 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2420 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2421 ))
2423 .fold(
2424 q!(|| 0usize),
2425 q!(
2426 |count, _| *count += 1,
2427 monotone = manual_proof!()
2428 ),
2429 )
2430 }
2431}
2432
2433impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2434 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2458 self,
2459 other: Stream<T, L, Unbounded, O2, R2>,
2460 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2461 where
2462 R: MinRetries<R2>,
2463 {
2464 Stream::new(
2465 self.location.clone(),
2466 HydroNode::Chain {
2467 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2468 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2469 metadata: self.location.new_node_metadata(Stream::<
2470 T,
2471 L,
2472 Unbounded,
2473 NoOrder,
2474 <R as MinRetries<R2>>::Min,
2475 >::collection_kind()),
2476 },
2477 )
2478 }
2479
2480 #[deprecated(note = "use `merge_unordered` instead")]
2482 pub fn interleave<O2: Ordering, R2: Retries>(
2483 self,
2484 other: Stream<T, L, Unbounded, O2, R2>,
2485 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2486 where
2487 R: MinRetries<R2>,
2488 {
2489 self.merge_unordered(other)
2490 }
2491}
2492
2493impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2494 pub fn merge_ordered<R2: Retries>(
2522 self,
2523 other: Stream<T, L, B, TotalOrder, R2>,
2524 _nondet: NonDet,
2525 ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2526 where
2527 R: MinRetries<R2>,
2528 {
2529 let target_location = self.location().drop_consistency();
2530 Stream::new(
2531 target_location.clone(),
2532 HydroNode::MergeOrdered {
2533 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2534 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2535 metadata: target_location.new_node_metadata(Stream::<
2536 T,
2537 L::DropConsistency,
2538 B,
2539 TotalOrder,
2540 <R as MinRetries<R2>>::Min,
2541 >::collection_kind()),
2542 },
2543 )
2544 }
2545}
2546
2547impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2548where
2549 L: Location<'a>,
2550{
2551 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2577 where
2578 B: IsBounded,
2579 T: Ord,
2580 {
2581 let this = self.make_bounded();
2582 Stream::new(
2583 this.location.clone(),
2584 HydroNode::Sort {
2585 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2586 metadata: this
2587 .location
2588 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2589 },
2590 )
2591 }
2592
2593 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2621 self,
2622 other: Stream<T, L, B2, O2, R2>,
2623 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2624 where
2625 B: IsBounded,
2626 O: MinOrder<O2>,
2627 R: MinRetries<R2>,
2628 {
2629 check_matching_location(&self.location, &other.location);
2630
2631 Stream::new(
2632 self.location.clone(),
2633 HydroNode::Chain {
2634 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2635 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2636 metadata: self.location.new_node_metadata(Stream::<
2637 T,
2638 L,
2639 B2,
2640 <O as MinOrder<O2>>::Min,
2641 <R as MinRetries<R2>>::Min,
2642 >::collection_kind()),
2643 },
2644 )
2645 }
2646
2647 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2651 self,
2652 other: Stream<T2, L, Bounded, O2, R2>,
2653 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2654 where
2655 B: IsBounded,
2656 T: Clone,
2657 T2: Clone,
2658 R: MinRetries<R2>,
2659 {
2660 let this = self.make_bounded();
2661 check_matching_location(&this.location, &other.location);
2662
2663 Stream::new(
2664 this.location.clone(),
2665 HydroNode::CrossProduct {
2666 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2667 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2668 metadata: this.location.new_node_metadata(Stream::<
2669 (T, T2),
2670 L,
2671 Bounded,
2672 <O2 as MinOrder<O>>::Min,
2673 <R as MinRetries<R2>>::Min,
2674 >::collection_kind()),
2675 },
2676 )
2677 }
2678
2679 pub fn repeat_with_keys<K, V2>(
2717 self,
2718 keys: KeyedSingleton<K, V2, L, Bounded>,
2719 ) -> KeyedStream<K, T, L, Bounded, O, R>
2720 where
2721 B: IsBounded,
2722 K: Clone,
2723 T: Clone,
2724 {
2725 keys.keys()
2726 .assume_ordering_trusted::<TotalOrder>(
2727 nondet!(),
2728 )
2729 .cross_product_nested_loop(self.make_bounded())
2730 .into_keyed()
2731 }
2732
2733 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2770 where
2771 T: Future,
2772 {
2773 Stream::new(
2774 self.location.clone(),
2775 HydroNode::ResolveFuturesBlocking {
2776 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2777 metadata: self
2778 .location
2779 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2780 },
2781 )
2782 }
2783
2784 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2804 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2805 where
2806 B: IsBounded,
2807 {
2808 self.make_bounded()
2809 .assume_ordering_trusted::<TotalOrder>(
2810 nondet!(),
2811 )
2812 .first()
2813 .is_none()
2814 }
2815}
2816
2817impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2818where
2819 L: Location<'a>,
2820{
2821 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2846 self,
2847 n: Stream<(K, V2), L, B2, O2, R2>,
2848 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2849 where
2850 K: Eq + Hash + Clone,
2851 R: MinRetries<R2>,
2852 V1: Clone,
2853 V2: Clone,
2854 {
2855 check_matching_location(&self.location, &n.location);
2856
2857 let ir_node = if B2::BOUNDED {
2858 HydroNode::JoinHalf {
2859 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2860 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2861 metadata: self.location.new_node_metadata(Stream::<
2862 (K, (V1, V2)),
2863 L,
2864 B,
2865 B2::PreserveOrderIfBounded<O>,
2866 <R as MinRetries<R2>>::Min,
2867 >::collection_kind()),
2868 }
2869 } else {
2870 HydroNode::Join {
2871 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2872 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2873 metadata: self.location.new_node_metadata(Stream::<
2874 (K, (V1, V2)),
2875 L,
2876 B,
2877 B2::PreserveOrderIfBounded<O>,
2878 <R as MinRetries<R2>>::Min,
2879 >::collection_kind()),
2880 }
2881 };
2882
2883 Stream::new(self.location.clone(), ir_node)
2884 }
2885
2886 pub fn anti_join<O2: Ordering, R2: Retries>(
2912 self,
2913 n: Stream<K, L, Bounded, O2, R2>,
2914 ) -> Stream<(K, V1), L, B, O, R>
2915 where
2916 K: Eq + Hash,
2917 {
2918 check_matching_location(&self.location, &n.location);
2919
2920 Stream::new(
2921 self.location.clone(),
2922 HydroNode::AntiJoin {
2923 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2924 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2925 metadata: self
2926 .location
2927 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2928 },
2929 )
2930 }
2931}
2932
2933impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2934 Stream<(K, V), L, B, O, R>
2935{
2936 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2963 KeyedStream::new(
2964 self.location.clone(),
2965 HydroNode::Cast {
2966 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2967 metadata: self
2968 .location
2969 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2970 },
2971 )
2972 }
2973}
2974
2975impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2976where
2977 K: Eq + Hash,
2978 L: Location<'a>,
2979{
2980 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2999 self.into_keyed()
3000 .fold(
3001 q!(|| ()),
3002 q!(
3003 |_, _| {},
3004 commutative = manual_proof!(),
3005 idempotent = manual_proof!()
3006 ),
3007 )
3008 .keys()
3009 }
3010}
3011
3012impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3013where
3014 L: Location<'a>,
3015{
3016 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3023 self,
3024 tick: &Tick<L2>,
3025 _nondet: NonDet,
3026 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3027 Stream::new(
3028 tick.drop_consistency(),
3029 HydroNode::Batch {
3030 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3031 metadata: tick
3032 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3033 },
3034 )
3035 }
3036
3037 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3040 Stream::new(
3041 self.location.tick.l.clone(),
3042 HydroNode::EndAtomic {
3043 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3044 metadata: self
3045 .location
3046 .tick
3047 .l
3048 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3049 },
3050 )
3051 }
3052}
3053
3054impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3055where
3056 L: TopLevel<'a>,
3057 F: Future<Output = T>,
3058{
3059 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3090 Stream::new(
3091 self.location.clone(),
3092 HydroNode::ResolveFutures {
3093 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3094 metadata: self
3095 .location
3096 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3097 },
3098 )
3099 }
3100
3101 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3132 Stream::new(
3133 self.location.clone(),
3134 HydroNode::ResolveFuturesOrdered {
3135 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3136 metadata: self
3137 .location
3138 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3139 },
3140 )
3141 }
3142}
3143
3144impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3145where
3146 L: Location<'a>,
3147{
3148 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3151 Stream::new(
3152 self.location.outer().clone(),
3153 HydroNode::YieldConcat {
3154 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3155 metadata: self
3156 .location
3157 .outer()
3158 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3159 },
3160 )
3161 }
3162
3163 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3170 let out_location = Atomic {
3171 tick: self.location.clone(),
3172 };
3173
3174 Stream::new(
3175 out_location.clone(),
3176 HydroNode::YieldConcat {
3177 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3178 metadata: out_location
3179 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3180 },
3181 )
3182 }
3183
3184 pub fn across_ticks<Out: BatchAtomic<'a>>(
3220 self,
3221 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3222 ) -> Out::Batched {
3223 thunk(self.all_ticks_atomic()).batched_atomic()
3224 }
3225
3226 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3265 Stream::new(
3266 self.location.clone(),
3267 HydroNode::DeferTick {
3268 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3269 metadata: self
3270 .location
3271 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3272 },
3273 )
3274 }
3275}
3276
3277#[cfg(test)]
3278mod tests {
3279 #[cfg(feature = "deploy")]
3280 use futures::{SinkExt, StreamExt};
3281 #[cfg(feature = "deploy")]
3282 use hydro_deploy::Deployment;
3283 #[cfg(feature = "deploy")]
3284 use serde::{Deserialize, Serialize};
3285 #[cfg(any(feature = "deploy", feature = "sim"))]
3286 use stageleft::q;
3287
3288 #[cfg(any(feature = "deploy", feature = "sim"))]
3289 use crate::compile::builder::FlowBuilder;
3290 #[cfg(feature = "deploy")]
3291 use crate::live_collections::sliced::sliced;
3292 #[cfg(feature = "deploy")]
3293 use crate::live_collections::stream::ExactlyOnce;
3294 #[cfg(feature = "sim")]
3295 use crate::live_collections::stream::NoOrder;
3296 #[cfg(any(feature = "deploy", feature = "sim"))]
3297 use crate::live_collections::stream::TotalOrder;
3298 #[cfg(any(feature = "deploy", feature = "sim"))]
3299 use crate::location::Location;
3300 #[cfg(feature = "sim")]
3301 use crate::networking::TCP;
3302 #[cfg(any(feature = "deploy", feature = "sim"))]
3303 use crate::nondet::nondet;
3304
3305 mod backtrace_chained_ops;
3306
3307 #[cfg(feature = "deploy")]
3308 struct P1 {}
3309 #[cfg(feature = "deploy")]
3310 struct P2 {}
3311
3312 #[cfg(feature = "deploy")]
3313 #[derive(Serialize, Deserialize, Debug)]
3314 struct SendOverNetwork {
3315 n: u32,
3316 }
3317
3318 #[cfg(feature = "deploy")]
3319 #[tokio::test]
3320 async fn first_ten_distributed() {
3321 use crate::networking::TCP;
3322
3323 let mut deployment = Deployment::new();
3324
3325 let mut flow = FlowBuilder::new();
3326 let first_node = flow.process::<P1>();
3327 let second_node = flow.process::<P2>();
3328 let external = flow.external::<P2>();
3329
3330 let numbers = first_node.source_iter(q!(0..10));
3331 let out_port = numbers
3332 .map(q!(|n| SendOverNetwork { n }))
3333 .send(&second_node, TCP.fail_stop().bincode())
3334 .send_bincode_external(&external);
3335
3336 let nodes = flow
3337 .with_process(&first_node, deployment.Localhost())
3338 .with_process(&second_node, deployment.Localhost())
3339 .with_external(&external, deployment.Localhost())
3340 .deploy(&mut deployment);
3341
3342 deployment.deploy().await.unwrap();
3343
3344 let mut external_out = nodes.connect(out_port).await;
3345
3346 deployment.start().await.unwrap();
3347
3348 for i in 0..10 {
3349 assert_eq!(external_out.next().await.unwrap().n, i);
3350 }
3351 }
3352
3353 #[cfg(feature = "deploy")]
3354 #[tokio::test]
3355 async fn first_cardinality() {
3356 let mut deployment = Deployment::new();
3357
3358 let mut flow = FlowBuilder::new();
3359 let node = flow.process::<()>();
3360 let external = flow.external::<()>();
3361
3362 let node_tick = node.tick();
3363 let count = node_tick
3364 .singleton(q!([1, 2, 3]))
3365 .into_stream()
3366 .flatten_ordered()
3367 .first()
3368 .into_stream()
3369 .count()
3370 .all_ticks()
3371 .send_bincode_external(&external);
3372
3373 let nodes = flow
3374 .with_process(&node, deployment.Localhost())
3375 .with_external(&external, deployment.Localhost())
3376 .deploy(&mut deployment);
3377
3378 deployment.deploy().await.unwrap();
3379
3380 let mut external_out = nodes.connect(count).await;
3381
3382 deployment.start().await.unwrap();
3383
3384 assert_eq!(external_out.next().await.unwrap(), 1);
3385 }
3386
3387 #[cfg(feature = "deploy")]
3388 #[tokio::test]
3389 async fn unbounded_reduce_remembers_state() {
3390 let mut deployment = Deployment::new();
3391
3392 let mut flow = FlowBuilder::new();
3393 let node = flow.process::<()>();
3394 let external = flow.external::<()>();
3395
3396 let (input_port, input) = node.source_external_bincode(&external);
3397 let out = input
3398 .reduce(q!(|acc, v| *acc += v))
3399 .sample_eager(nondet!())
3400 .send_bincode_external(&external);
3401
3402 let nodes = flow
3403 .with_process(&node, deployment.Localhost())
3404 .with_external(&external, deployment.Localhost())
3405 .deploy(&mut deployment);
3406
3407 deployment.deploy().await.unwrap();
3408
3409 let mut external_in = nodes.connect(input_port).await;
3410 let mut external_out = nodes.connect(out).await;
3411
3412 deployment.start().await.unwrap();
3413
3414 external_in.send(1).await.unwrap();
3415 assert_eq!(external_out.next().await.unwrap(), 1);
3416
3417 external_in.send(2).await.unwrap();
3418 assert_eq!(external_out.next().await.unwrap(), 3);
3419 }
3420
3421 #[cfg(feature = "deploy")]
3422 #[tokio::test]
3423 async fn top_level_bounded_cross_singleton() {
3424 let mut deployment = Deployment::new();
3425
3426 let mut flow = FlowBuilder::new();
3427 let node = flow.process::<()>();
3428 let external = flow.external::<()>();
3429
3430 let (input_port, input) =
3431 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3432
3433 let out = input
3434 .cross_singleton(
3435 node.source_iter(q!(vec![1, 2, 3]))
3436 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3437 )
3438 .send_bincode_external(&external);
3439
3440 let nodes = flow
3441 .with_process(&node, deployment.Localhost())
3442 .with_external(&external, deployment.Localhost())
3443 .deploy(&mut deployment);
3444
3445 deployment.deploy().await.unwrap();
3446
3447 let mut external_in = nodes.connect(input_port).await;
3448 let mut external_out = nodes.connect(out).await;
3449
3450 deployment.start().await.unwrap();
3451
3452 external_in.send(1).await.unwrap();
3453 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3454
3455 external_in.send(2).await.unwrap();
3456 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3457 }
3458
3459 #[cfg(feature = "deploy")]
3460 #[tokio::test]
3461 async fn top_level_bounded_reduce_cardinality() {
3462 let mut deployment = Deployment::new();
3463
3464 let mut flow = FlowBuilder::new();
3465 let node = flow.process::<()>();
3466 let external = flow.external::<()>();
3467
3468 let (input_port, input) =
3469 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3470
3471 let out = sliced! {
3472 let input = use(input, nondet!());
3473 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!());
3474 input.cross_singleton(v.into_stream().count())
3475 }
3476 .send_bincode_external(&external);
3477
3478 let nodes = flow
3479 .with_process(&node, deployment.Localhost())
3480 .with_external(&external, deployment.Localhost())
3481 .deploy(&mut deployment);
3482
3483 deployment.deploy().await.unwrap();
3484
3485 let mut external_in = nodes.connect(input_port).await;
3486 let mut external_out = nodes.connect(out).await;
3487
3488 deployment.start().await.unwrap();
3489
3490 external_in.send(1).await.unwrap();
3491 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3492
3493 external_in.send(2).await.unwrap();
3494 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3495 }
3496
3497 #[cfg(feature = "deploy")]
3498 #[tokio::test]
3499 async fn top_level_bounded_into_singleton_cardinality() {
3500 let mut deployment = Deployment::new();
3501
3502 let mut flow = FlowBuilder::new();
3503 let node = flow.process::<()>();
3504 let external = flow.external::<()>();
3505
3506 let (input_port, input) =
3507 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3508
3509 let out = sliced! {
3510 let input = use(input, nondet!());
3511 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!());
3512 input.cross_singleton(v.into_stream().count())
3513 }
3514 .send_bincode_external(&external);
3515
3516 let nodes = flow
3517 .with_process(&node, deployment.Localhost())
3518 .with_external(&external, deployment.Localhost())
3519 .deploy(&mut deployment);
3520
3521 deployment.deploy().await.unwrap();
3522
3523 let mut external_in = nodes.connect(input_port).await;
3524 let mut external_out = nodes.connect(out).await;
3525
3526 deployment.start().await.unwrap();
3527
3528 external_in.send(1).await.unwrap();
3529 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3530
3531 external_in.send(2).await.unwrap();
3532 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3533 }
3534
3535 #[cfg(feature = "deploy")]
3536 #[tokio::test]
3537 async fn atomic_fold_replays_each_tick() {
3538 let mut deployment = Deployment::new();
3539
3540 let mut flow = FlowBuilder::new();
3541 let node = flow.process::<()>();
3542 let external = flow.external::<()>();
3543
3544 let (input_port, input) =
3545 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3546 let tick = node.tick();
3547
3548 let out = input
3549 .batch(&tick, nondet!())
3550 .cross_singleton(
3551 node.source_iter(q!(vec![1, 2, 3]))
3552 .atomic()
3553 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3554 .snapshot_atomic(&tick, nondet!()),
3555 )
3556 .all_ticks()
3557 .send_bincode_external(&external);
3558
3559 let nodes = flow
3560 .with_process(&node, deployment.Localhost())
3561 .with_external(&external, deployment.Localhost())
3562 .deploy(&mut deployment);
3563
3564 deployment.deploy().await.unwrap();
3565
3566 let mut external_in = nodes.connect(input_port).await;
3567 let mut external_out = nodes.connect(out).await;
3568
3569 deployment.start().await.unwrap();
3570
3571 external_in.send(1).await.unwrap();
3572 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3573
3574 external_in.send(2).await.unwrap();
3575 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3576 }
3577
3578 #[cfg(feature = "deploy")]
3579 #[tokio::test]
3580 async fn unbounded_scan_remembers_state() {
3581 let mut deployment = Deployment::new();
3582
3583 let mut flow = FlowBuilder::new();
3584 let node = flow.process::<()>();
3585 let external = flow.external::<()>();
3586
3587 let (input_port, input) = node.source_external_bincode(&external);
3588 let out = input
3589 .scan(
3590 q!(|| 0),
3591 q!(|acc, v| {
3592 *acc += v;
3593 Some(*acc)
3594 }),
3595 )
3596 .send_bincode_external(&external);
3597
3598 let nodes = flow
3599 .with_process(&node, deployment.Localhost())
3600 .with_external(&external, deployment.Localhost())
3601 .deploy(&mut deployment);
3602
3603 deployment.deploy().await.unwrap();
3604
3605 let mut external_in = nodes.connect(input_port).await;
3606 let mut external_out = nodes.connect(out).await;
3607
3608 deployment.start().await.unwrap();
3609
3610 external_in.send(1).await.unwrap();
3611 assert_eq!(external_out.next().await.unwrap(), 1);
3612
3613 external_in.send(2).await.unwrap();
3614 assert_eq!(external_out.next().await.unwrap(), 3);
3615 }
3616
3617 #[cfg(feature = "deploy")]
3618 #[tokio::test]
3619 async fn unbounded_enumerate_remembers_state() {
3620 let mut deployment = Deployment::new();
3621
3622 let mut flow = FlowBuilder::new();
3623 let node = flow.process::<()>();
3624 let external = flow.external::<()>();
3625
3626 let (input_port, input) = node.source_external_bincode(&external);
3627 let out = input.enumerate().send_bincode_external(&external);
3628
3629 let nodes = flow
3630 .with_process(&node, deployment.Localhost())
3631 .with_external(&external, deployment.Localhost())
3632 .deploy(&mut deployment);
3633
3634 deployment.deploy().await.unwrap();
3635
3636 let mut external_in = nodes.connect(input_port).await;
3637 let mut external_out = nodes.connect(out).await;
3638
3639 deployment.start().await.unwrap();
3640
3641 external_in.send(1).await.unwrap();
3642 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3643
3644 external_in.send(2).await.unwrap();
3645 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3646 }
3647
3648 #[cfg(feature = "deploy")]
3649 #[tokio::test]
3650 async fn unbounded_unique_remembers_state() {
3651 let mut deployment = Deployment::new();
3652
3653 let mut flow = FlowBuilder::new();
3654 let node = flow.process::<()>();
3655 let external = flow.external::<()>();
3656
3657 let (input_port, input) =
3658 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3659 let out = input.unique().send_bincode_external(&external);
3660
3661 let nodes = flow
3662 .with_process(&node, deployment.Localhost())
3663 .with_external(&external, deployment.Localhost())
3664 .deploy(&mut deployment);
3665
3666 deployment.deploy().await.unwrap();
3667
3668 let mut external_in = nodes.connect(input_port).await;
3669 let mut external_out = nodes.connect(out).await;
3670
3671 deployment.start().await.unwrap();
3672
3673 external_in.send(1).await.unwrap();
3674 assert_eq!(external_out.next().await.unwrap(), 1);
3675
3676 external_in.send(2).await.unwrap();
3677 assert_eq!(external_out.next().await.unwrap(), 2);
3678
3679 external_in.send(1).await.unwrap();
3680 external_in.send(3).await.unwrap();
3681 assert_eq!(external_out.next().await.unwrap(), 3);
3682 }
3683
3684 #[cfg(feature = "sim")]
3685 #[test]
3686 #[should_panic]
3687 fn sim_batch_nondet_size() {
3688 let mut flow = FlowBuilder::new();
3689 let node = flow.process::<()>();
3690
3691 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3692
3693 let tick = node.tick();
3694 let out_recv = input
3695 .batch(&tick, nondet!())
3696 .count()
3697 .all_ticks()
3698 .sim_output();
3699
3700 flow.sim().exhaustive(async || {
3701 in_send.send(());
3702 in_send.send(());
3703 in_send.send(());
3704
3705 assert_eq!(out_recv.next().await.unwrap(), 3); });
3707 }
3708
3709 #[cfg(feature = "sim")]
3710 #[test]
3711 fn sim_batch_preserves_order() {
3712 let mut flow = FlowBuilder::new();
3713 let node = flow.process::<()>();
3714
3715 let (in_send, input) = node.sim_input();
3716
3717 let tick = node.tick();
3718 let out_recv = input
3719 .batch(&tick, nondet!())
3720 .all_ticks()
3721 .sim_output();
3722
3723 flow.sim().exhaustive(async || {
3724 in_send.send(1);
3725 in_send.send(2);
3726 in_send.send(3);
3727
3728 out_recv.assert_yields_only([1, 2, 3]).await;
3729 });
3730 }
3731
3732 #[cfg(feature = "sim")]
3733 #[test]
3734 #[should_panic]
3735 fn sim_batch_unordered_shuffles() {
3736 let mut flow = FlowBuilder::new();
3737 let node = flow.process::<()>();
3738
3739 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3740
3741 let tick = node.tick();
3742 let batch = input.batch(&tick, nondet!());
3743 let out_recv = batch
3744 .clone()
3745 .min()
3746 .zip(batch.max())
3747 .all_ticks()
3748 .sim_output();
3749
3750 flow.sim().exhaustive(async || {
3751 in_send.send_many_unordered([1, 2, 3]);
3752
3753 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3754 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3755 }
3756 });
3757 }
3758
3759 #[cfg(feature = "sim")]
3760 #[test]
3761 fn sim_batch_unordered_shuffles_count() {
3762 let mut flow = FlowBuilder::new();
3763 let node = flow.process::<()>();
3764
3765 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3766
3767 let tick = node.tick();
3768 let batch = input.batch(&tick, nondet!());
3769 let out_recv = batch.all_ticks().sim_output();
3770
3771 let instance_count = flow.sim().exhaustive(async || {
3772 in_send.send_many_unordered([1, 2, 3, 4]);
3773 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3774 });
3775
3776 assert_eq!(
3777 instance_count,
3778 75 )
3780 }
3781
3782 #[cfg(feature = "sim")]
3783 #[test]
3784 #[should_panic]
3785 fn sim_observe_order_batched() {
3786 let mut flow = FlowBuilder::new();
3787 let node = flow.process::<()>();
3788
3789 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3790
3791 let tick = node.tick();
3792 let batch = input.batch(&tick, nondet!());
3793 let out_recv = batch
3794 .assume_ordering::<TotalOrder>(nondet!())
3795 .all_ticks()
3796 .sim_output();
3797
3798 flow.sim().exhaustive(async || {
3799 in_send.send_many_unordered([1, 2, 3, 4]);
3800 out_recv.assert_yields_only([1, 2, 3, 4]).await; });
3802 }
3803
3804 #[cfg(feature = "sim")]
3805 #[test]
3806 fn sim_observe_order_batched_count() {
3807 let mut flow = FlowBuilder::new();
3808 let node = flow.process::<()>();
3809
3810 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3811
3812 let tick = node.tick();
3813 let batch = input.batch(&tick, nondet!());
3814 let out_recv = batch
3815 .assume_ordering::<TotalOrder>(nondet!())
3816 .all_ticks()
3817 .sim_output();
3818
3819 let instance_count = flow.sim().exhaustive(async || {
3820 in_send.send_many_unordered([1, 2, 3, 4]);
3821 let _ = out_recv.collect::<Vec<_>>().await;
3822 });
3823
3824 assert_eq!(
3825 instance_count,
3826 192 )
3828 }
3829
3830 #[cfg(feature = "sim")]
3831 #[test]
3832 fn sim_unordered_count_instance_count() {
3833 let mut flow = FlowBuilder::new();
3834 let node = flow.process::<()>();
3835
3836 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3837
3838 let tick = node.tick();
3839 let out_recv = input
3840 .count()
3841 .snapshot(&tick, nondet!())
3842 .all_ticks()
3843 .sim_output();
3844
3845 let instance_count = flow.sim().exhaustive(async || {
3846 in_send.send_many_unordered([1, 2, 3, 4]);
3847 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3848 });
3849
3850 assert_eq!(
3851 instance_count,
3852 16 )
3854 }
3855
3856 #[cfg(feature = "sim")]
3857 #[test]
3858 fn sim_top_level_assume_ordering() {
3859 let mut flow = FlowBuilder::new();
3860 let node = flow.process::<()>();
3861
3862 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3863
3864 let out_recv = input
3865 .assume_ordering::<TotalOrder>(nondet!())
3866 .sim_output();
3867
3868 let instance_count = flow.sim().exhaustive(async || {
3869 in_send.send_many_unordered([1, 2, 3]);
3870 let mut out = out_recv.collect::<Vec<_>>().await;
3871 out.sort();
3872 assert_eq!(out, vec![1, 2, 3]);
3873 });
3874
3875 assert_eq!(instance_count, 6)
3876 }
3877
3878 #[cfg(feature = "sim")]
3879 #[test]
3880 fn sim_top_level_assume_ordering_cycle_back() {
3881 let mut flow = FlowBuilder::new();
3882 let node = flow.process::<()>();
3883 let node2 = flow.process::<()>();
3884
3885 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3886
3887 let (complete_cycle_back, cycle_back) =
3888 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3889 let ordered = input
3890 .merge_unordered(cycle_back)
3891 .assume_ordering::<TotalOrder>(nondet!());
3892 complete_cycle_back.complete(
3893 ordered
3894 .clone()
3895 .map(q!(|v| v + 1))
3896 .filter(q!(|v| v % 2 == 1))
3897 .send(&node2, TCP.fail_stop().bincode())
3898 .send(&node, TCP.fail_stop().bincode()),
3899 );
3900
3901 let out_recv = ordered.sim_output();
3902
3903 let mut saw = false;
3904 let instance_count = flow.sim().exhaustive(async || {
3905 in_send.send_many_unordered([0, 2]);
3906 let out = out_recv.collect::<Vec<_>>().await;
3907
3908 if out.starts_with(&[0, 1, 2]) {
3909 saw = true;
3910 }
3911 });
3912
3913 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3914 assert_eq!(instance_count, 6);
3915 }
3916
3917 #[cfg(feature = "sim")]
3918 #[test]
3919 fn sim_top_level_assume_ordering_cycle_back_tick() {
3920 let mut flow = FlowBuilder::new();
3921 let node = flow.process::<()>();
3922 let node2 = flow.process::<()>();
3923
3924 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3925
3926 let (complete_cycle_back, cycle_back) =
3927 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3928 let ordered = input
3929 .merge_unordered(cycle_back)
3930 .assume_ordering::<TotalOrder>(nondet!());
3931 complete_cycle_back.complete(
3932 ordered
3933 .clone()
3934 .batch(&node.tick(), nondet!())
3935 .all_ticks()
3936 .map(q!(|v| v + 1))
3937 .filter(q!(|v| v % 2 == 1))
3938 .send(&node2, TCP.fail_stop().bincode())
3939 .send(&node, TCP.fail_stop().bincode()),
3940 );
3941
3942 let out_recv = ordered.sim_output();
3943
3944 let mut saw = false;
3945 let instance_count = flow.sim().exhaustive(async || {
3946 in_send.send_many_unordered([0, 2]);
3947 let out = out_recv.collect::<Vec<_>>().await;
3948
3949 if out.starts_with(&[0, 1, 2]) {
3950 saw = true;
3951 }
3952 });
3953
3954 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3955 assert_eq!(instance_count, 58);
3956 }
3957
3958 #[cfg(feature = "sim")]
3959 #[test]
3960 fn sim_top_level_assume_ordering_multiple() {
3961 let mut flow = FlowBuilder::new();
3962 let node = flow.process::<()>();
3963 let node2 = flow.process::<()>();
3964
3965 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3966 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3967
3968 let (complete_cycle_back, cycle_back) =
3969 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3970 let input1_ordered = input
3971 .clone()
3972 .merge_unordered(cycle_back)
3973 .assume_ordering::<TotalOrder>(nondet!());
3974 let foo = input1_ordered
3975 .clone()
3976 .map(q!(|v| v + 3))
3977 .weaken_ordering::<NoOrder>()
3978 .merge_unordered(input2)
3979 .assume_ordering::<TotalOrder>(nondet!());
3980
3981 complete_cycle_back.complete(
3982 foo.filter(q!(|v| *v == 3))
3983 .send(&node2, TCP.fail_stop().bincode())
3984 .send(&node, TCP.fail_stop().bincode()),
3985 );
3986
3987 let out_recv = input1_ordered.sim_output();
3988
3989 let mut saw = false;
3990 let instance_count = flow.sim().exhaustive(async || {
3991 in_send.send_many_unordered([0, 1]);
3992 let out = out_recv.collect::<Vec<_>>().await;
3993
3994 if out.starts_with(&[0, 3, 1]) {
3995 saw = true;
3996 }
3997 });
3998
3999 assert!(saw, "did not see an instance with 0, 3, 1 in order");
4000 assert_eq!(instance_count, 24);
4001 }
4002
4003 #[cfg(feature = "sim")]
4004 #[test]
4005 fn sim_atomic_assume_ordering_cycle_back() {
4006 let mut flow = FlowBuilder::new();
4007 let node = flow.process::<()>();
4008 let node2 = flow.process::<()>();
4009
4010 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4011
4012 let (complete_cycle_back, cycle_back) =
4013 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4014 let ordered = input
4015 .merge_unordered(cycle_back)
4016 .atomic()
4017 .assume_ordering::<TotalOrder>(nondet!())
4018 .end_atomic();
4019 complete_cycle_back.complete(
4020 ordered
4021 .clone()
4022 .map(q!(|v| v + 1))
4023 .filter(q!(|v| v % 2 == 1))
4024 .send(&node2, TCP.fail_stop().bincode())
4025 .send(&node, TCP.fail_stop().bincode()),
4026 );
4027
4028 let out_recv = ordered.sim_output();
4029
4030 let instance_count = flow.sim().exhaustive(async || {
4031 in_send.send_many_unordered([0, 2]);
4032 let out = out_recv.collect::<Vec<_>>().await;
4033 assert_eq!(out.len(), 4);
4034 });
4035 assert_eq!(instance_count, 22);
4036 }
4037
4038 #[cfg(feature = "deploy")]
4039 #[tokio::test]
4040 async fn partition_evens_odds() {
4041 let mut deployment = Deployment::new();
4042
4043 let mut flow = FlowBuilder::new();
4044 let node = flow.process::<()>();
4045 let external = flow.external::<()>();
4046
4047 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4048 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4049 let evens_port = evens.send_bincode_external(&external);
4050 let odds_port = odds.send_bincode_external(&external);
4051
4052 let nodes = flow
4053 .with_process(&node, deployment.Localhost())
4054 .with_external(&external, deployment.Localhost())
4055 .deploy(&mut deployment);
4056
4057 deployment.deploy().await.unwrap();
4058
4059 let mut evens_out = nodes.connect(evens_port).await;
4060 let mut odds_out = nodes.connect(odds_port).await;
4061
4062 deployment.start().await.unwrap();
4063
4064 let mut even_results = Vec::new();
4065 for _ in 0..3 {
4066 even_results.push(evens_out.next().await.unwrap());
4067 }
4068 even_results.sort();
4069 assert_eq!(even_results, vec![2, 4, 6]);
4070
4071 let mut odd_results = Vec::new();
4072 for _ in 0..3 {
4073 odd_results.push(odds_out.next().await.unwrap());
4074 }
4075 odd_results.sort();
4076 assert_eq!(odd_results, vec![1, 3, 5]);
4077 }
4078
4079 #[cfg(feature = "deploy")]
4080 #[tokio::test]
4081 async fn unconsumed_inspect_still_runs() {
4082 use crate::deploy::DeployCrateWrapper;
4083
4084 let mut deployment = Deployment::new();
4085
4086 let mut flow = FlowBuilder::new();
4087 let node = flow.process::<()>();
4088
4089 node.source_iter(q!(0..5))
4092 .inspect(q!(|x| println!("inspect: {}", x)));
4093
4094 let nodes = flow
4095 .with_process(&node, deployment.Localhost())
4096 .deploy(&mut deployment);
4097
4098 deployment.deploy().await.unwrap();
4099
4100 let mut stdout = nodes.get_process(&node).stdout();
4101
4102 deployment.start().await.unwrap();
4103
4104 let mut lines = Vec::new();
4105 for _ in 0..5 {
4106 lines.push(stdout.recv().await.unwrap());
4107 }
4108 lines.sort();
4109 assert_eq!(
4110 lines,
4111 vec![
4112 "inspect: 0",
4113 "inspect: 1",
4114 "inspect: 2",
4115 "inspect: 3",
4116 "inspect: 4",
4117 ]
4118 );
4119 }
4120
4121 #[cfg(feature = "sim")]
4122 #[test]
4123 fn sim_limit() {
4124 let mut flow = FlowBuilder::new();
4125 let node = flow.process::<()>();
4126
4127 let (in_send, input) = node.sim_input();
4128
4129 let out_recv = input.limit(q!(3)).sim_output();
4130
4131 flow.sim().exhaustive(async || {
4132 in_send.send(1);
4133 in_send.send(2);
4134 in_send.send(3);
4135 in_send.send(4);
4136 in_send.send(5);
4137
4138 out_recv.assert_yields_only([1, 2, 3]).await;
4139 });
4140 }
4141
4142 #[cfg(feature = "sim")]
4143 #[test]
4144 fn sim_limit_zero() {
4145 let mut flow = FlowBuilder::new();
4146 let node = flow.process::<()>();
4147
4148 let (in_send, input) = node.sim_input();
4149
4150 let out_recv = input.limit(q!(0)).sim_output();
4151
4152 flow.sim().exhaustive(async || {
4153 in_send.send(1);
4154 in_send.send(2);
4155
4156 out_recv.assert_yields_only::<i32, _>([]).await;
4157 });
4158 }
4159
4160 #[cfg(feature = "sim")]
4161 #[test]
4162 fn sim_merge_ordered() {
4163 let mut flow = FlowBuilder::new();
4164 let node = flow.process::<()>();
4165
4166 let (in_send, input) = node.sim_input();
4167 let (in_send2, input2) = node.sim_input();
4168
4169 let out_recv = input
4170 .merge_ordered(input2, nondet!())
4171 .sim_output();
4172
4173 let mut saw_out_of_order = false;
4174 let instances = flow.sim().exhaustive(async || {
4175 in_send.send(1);
4176 in_send.send(2);
4177 in_send2.send(3);
4178 in_send2.send(4);
4179
4180 let out = out_recv.collect::<Vec<_>>().await;
4181
4182 if out == [1, 3, 2, 4] {
4183 saw_out_of_order = true;
4184 }
4185
4186 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4189 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4190 assert_eq!(
4191 first_elements,
4192 vec![1, 2],
4193 "first input order violated: {:?}",
4194 out
4195 );
4196 assert_eq!(
4197 second_elements,
4198 vec![3, 4],
4199 "second input order violated: {:?}",
4200 out
4201 );
4202
4203 first_elements.append(&mut second_elements);
4204 first_elements.sort();
4205 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4206 });
4207
4208 assert!(saw_out_of_order);
4209 assert_eq!(instances, 6);
4210 }
4211
4212 #[cfg(feature = "sim")]
4215 #[test]
4216 fn sim_merge_ordered_one_empty() {
4217 let mut flow = FlowBuilder::new();
4218 let node = flow.process::<()>();
4219
4220 let (in_send, input) = node.sim_input();
4221 let (_in_send2, input2) = node.sim_input();
4222
4223 let out_recv = input
4224 .merge_ordered(input2, nondet!())
4225 .sim_output();
4226
4227 let instances = flow.sim().exhaustive(async || {
4228 in_send.send(1);
4229 in_send.send(2);
4230
4231 let out = out_recv.collect::<Vec<_>>().await;
4232 assert_eq!(out, vec![1, 2]);
4233 });
4234
4235 assert_eq!(instances, 1);
4237 }
4238
4239 #[cfg(feature = "sim")]
4245 #[test]
4246 fn sim_merge_ordered_cycle_back() {
4247 let mut flow = FlowBuilder::new();
4248 let node = flow.process::<()>();
4249
4250 let (in_send, input) = node.sim_input();
4251
4252 let (complete_cycle_back, cycle_back) =
4254 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4255
4256 let merged = input.merge_ordered(cycle_back, nondet!());
4258
4259 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4261
4262 let out_recv = merged.sim_output();
4263
4264 let mut saw_cycle_before_second = false;
4267 flow.sim().exhaustive(async || {
4268 in_send.send(1);
4269 in_send.send(2);
4270
4271 let out = out_recv.collect::<Vec<_>>().await;
4272
4273 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4275 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4276 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4277
4278 if out == [1, 10, 2] {
4280 saw_cycle_before_second = true;
4281 }
4282
4283 let mut sorted = out;
4284 sorted.sort();
4285 assert_eq!(sorted, vec![1, 2, 10]);
4286 });
4287
4288 assert!(
4289 saw_cycle_before_second,
4290 "never saw the cycled element arrive before the second input element"
4291 );
4292 }
4293
4294 #[cfg(feature = "sim")]
4298 #[test]
4299 fn sim_merge_ordered_delayed() {
4300 let mut flow = FlowBuilder::new();
4301 let node = flow.process::<()>();
4302
4303 let (in_send, input) = node.sim_input();
4304 let (in_send2, input2) = node.sim_input();
4305
4306 let out_recv = input
4307 .merge_ordered(input2, nondet!())
4308 .sim_output();
4309
4310 let mut saw_delayed_interleaving = false;
4311 flow.sim().exhaustive(async || {
4312 in_send.send(1);
4314 in_send2.send(3);
4315 in_send2.send(4);
4316
4317 let first_batch = out_recv.collect::<Vec<_>>().await;
4319
4320 in_send.send(2);
4322 let second_batch = out_recv.collect::<Vec<_>>().await;
4323
4324 let mut all: Vec<_> = first_batch
4325 .iter()
4326 .chain(second_batch.iter())
4327 .copied()
4328 .collect();
4329
4330 if all == [1, 3, 4, 2] {
4332 saw_delayed_interleaving = true;
4333 }
4334
4335 all.sort();
4336 assert_eq!(all, vec![1, 2, 3, 4]);
4337 });
4338
4339 assert!(saw_delayed_interleaving);
4340 }
4341
4342 #[cfg(feature = "deploy")]
4347 #[tokio::test]
4348 async fn deploy_merge_ordered_delayed() {
4349 let mut deployment = Deployment::new();
4350
4351 let mut flow = FlowBuilder::new();
4352 let node = flow.process::<()>();
4353 let external = flow.external::<()>();
4354
4355 let (input_a_port, input_a) = node.source_external_bincode(&external);
4356 let (input_b_port, input_b) = node.source_external_bincode(&external);
4357
4358 let out = input_a
4359 .assume_ordering(nondet!())
4360 .merge_ordered(
4361 input_b.assume_ordering(nondet!()),
4362 nondet!(),
4363 )
4364 .send_bincode_external(&external);
4365
4366 let nodes = flow
4367 .with_process(&node, deployment.Localhost())
4368 .with_external(&external, deployment.Localhost())
4369 .deploy(&mut deployment);
4370
4371 deployment.deploy().await.unwrap();
4372
4373 let mut ext_a = nodes.connect(input_a_port).await;
4374 let mut ext_b = nodes.connect(input_b_port).await;
4375 let mut ext_out = nodes.connect(out).await;
4376
4377 deployment.start().await.unwrap();
4378
4379 ext_a.send(1).await.unwrap();
4381 ext_b.send(3).await.unwrap();
4382 ext_b.send(4).await.unwrap();
4383
4384 let mut received = Vec::new();
4386 for _ in 0..3 {
4387 received.push(ext_out.next().await.unwrap());
4388 }
4389
4390 ext_a.send(2).await.unwrap();
4392 received.push(ext_out.next().await.unwrap());
4393
4394 received.sort();
4396 assert_eq!(received, vec![1, 2, 3, 4]);
4397 }
4398
4399 #[cfg(feature = "deploy")]
4400 #[tokio::test]
4401 async fn monotone_fold_threshold() {
4402 use crate::properties::manual_proof;
4403
4404 let mut deployment = Deployment::new();
4405
4406 let mut flow = FlowBuilder::new();
4407 let node = flow.process::<()>();
4408 let external = flow.external::<()>();
4409
4410 let in_unbounded: super::Stream<_, _> =
4411 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4412 let sum = in_unbounded.fold(
4413 q!(|| 0),
4414 q!(
4415 |sum, v| {
4416 *sum += v;
4417 },
4418 monotone = manual_proof!()
4419 ),
4420 );
4421
4422 let threshold_out = sum
4423 .threshold_greater_or_equal(node.singleton(q!(7)))
4424 .send_bincode_external(&external);
4425
4426 let nodes = flow
4427 .with_process(&node, deployment.Localhost())
4428 .with_external(&external, deployment.Localhost())
4429 .deploy(&mut deployment);
4430
4431 deployment.deploy().await.unwrap();
4432
4433 let mut threshold_out = nodes.connect(threshold_out).await;
4434
4435 deployment.start().await.unwrap();
4436
4437 assert_eq!(threshold_out.next().await.unwrap(), 7);
4438 }
4439
4440 #[cfg(feature = "deploy")]
4441 #[tokio::test]
4442 async fn monotone_count_threshold() {
4443 let mut deployment = Deployment::new();
4444
4445 let mut flow = FlowBuilder::new();
4446 let node = flow.process::<()>();
4447 let external = flow.external::<()>();
4448
4449 let in_unbounded: super::Stream<_, _> =
4450 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4451 let sum = in_unbounded.count();
4452
4453 let threshold_out = sum
4454 .threshold_greater_or_equal(node.singleton(q!(3)))
4455 .send_bincode_external(&external);
4456
4457 let nodes = flow
4458 .with_process(&node, deployment.Localhost())
4459 .with_external(&external, deployment.Localhost())
4460 .deploy(&mut deployment);
4461
4462 deployment.deploy().await.unwrap();
4463
4464 let mut threshold_out = nodes.connect(threshold_out).await;
4465
4466 deployment.start().await.unwrap();
4467
4468 assert_eq!(threshold_out.next().await.unwrap(), 3);
4469 }
4470
4471 #[cfg(feature = "deploy")]
4472 #[tokio::test]
4473 async fn monotone_map_order_preserving_threshold() {
4474 use crate::properties::manual_proof;
4475
4476 let mut deployment = Deployment::new();
4477
4478 let mut flow = FlowBuilder::new();
4479 let node = flow.process::<()>();
4480 let external = flow.external::<()>();
4481
4482 let in_unbounded: super::Stream<_, _> =
4483 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4484 let sum = in_unbounded.fold(
4485 q!(|| 0),
4486 q!(
4487 |sum, v| {
4488 *sum += v;
4489 },
4490 monotone = manual_proof!()
4491 ),
4492 );
4493
4494 let doubled = sum.map(q!(
4496 |v| v * 2,
4497 order_preserving = manual_proof!()
4498 ));
4499
4500 let threshold_out = doubled
4501 .threshold_greater_or_equal(node.singleton(q!(14)))
4502 .send_bincode_external(&external);
4503
4504 let nodes = flow
4505 .with_process(&node, deployment.Localhost())
4506 .with_external(&external, deployment.Localhost())
4507 .deploy(&mut deployment);
4508
4509 deployment.deploy().await.unwrap();
4510
4511 let mut threshold_out = nodes.connect(threshold_out).await;
4512
4513 deployment.start().await.unwrap();
4514
4515 assert_eq!(threshold_out.next().await.unwrap(), 14);
4516 }
4517
4518 #[cfg(any(feature = "deploy", feature = "sim"))]
4521 mod join_ordering_type_tests {
4522 use crate::live_collections::boundedness::{Bounded, Unbounded};
4523 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4524 use crate::location::{Location, Process};
4525
4526 #[expect(dead_code, reason = "compile-time type test")]
4527 fn join_unbounded_with_bounded_preserves_order<'a>(
4528 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4529 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4530 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4531 left.join(right)
4532 }
4533
4534 #[expect(dead_code, reason = "compile-time type test")]
4535 fn join_unbounded_with_unbounded_is_no_order<'a>(
4536 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4537 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4538 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4539 left.join(right)
4540 }
4541
4542 #[expect(dead_code, reason = "compile-time type test")]
4543 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4544 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4545 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4546 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4547 left.join(right)
4548 }
4549
4550 #[expect(dead_code, reason = "compile-time type test")]
4551 fn join_unbounded_noorder_with_bounded<'a>(
4552 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4553 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4554 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4555 left.join(right)
4556 }
4557
4558 #[expect(dead_code, reason = "compile-time type test")]
4561 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4562 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4563 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4564 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4565 left.cross_product(right)
4566 }
4567
4568 #[expect(dead_code, reason = "compile-time type test")]
4569 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4570 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4571 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4572 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4573 left.cross_product(right)
4574 }
4575
4576 #[expect(dead_code, reason = "compile-time type test")]
4577 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4578 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4579 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4580 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4581 left.cross_product(right)
4582 }
4583 } #[cfg(feature = "sim")]
4588 #[test]
4589 fn cross_product_mixed_boundedness_correctness() {
4590 use stageleft::q;
4591
4592 use crate::compile::builder::FlowBuilder;
4593 use crate::nondet::nondet;
4594
4595 let mut flow = FlowBuilder::new();
4596 let process = flow.process::<()>();
4597 let tick = process.tick();
4598
4599 let left = process.source_iter(q!(vec![1, 2]));
4600 let right = process
4601 .source_iter(q!(vec!['a', 'b']))
4602 .batch(&tick, nondet!())
4603 .all_ticks();
4604
4605 let out = left.cross_product(right).sim_output();
4606
4607 flow.sim().exhaustive(async || {
4608 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4609 .await;
4610 });
4611 }
4612
4613 #[cfg(feature = "sim")]
4614 #[test]
4615 fn join_mixed_boundedness_correctness() {
4616 use stageleft::q;
4617
4618 use crate::compile::builder::FlowBuilder;
4619 use crate::nondet::nondet;
4620
4621 let mut flow = FlowBuilder::new();
4622 let process = flow.process::<()>();
4623 let tick = process.tick();
4624
4625 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4626 let right = process
4627 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4628 .batch(&tick, nondet!())
4629 .all_ticks();
4630
4631 let out = left.join(right).sim_output();
4632
4633 flow.sim().exhaustive(async || {
4634 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4635 .await;
4636 });
4637 }
4638
4639 #[cfg(feature = "sim")]
4640 #[test]
4641 fn sim_merge_unordered_independent_atomics() {
4642 let mut flow = FlowBuilder::new();
4643 let node = flow.process::<()>();
4644
4645 let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4646 let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4647
4648 let out = input1
4649 .atomic()
4650 .merge_unordered(input2.atomic())
4651 .end_atomic()
4652 .sim_output();
4653
4654 flow.sim().exhaustive(async || {
4655 in1_send.send(1);
4656 in2_send.send(2);
4657
4658 out.assert_yields_only_unordered(vec![1, 2]).await;
4659 });
4660 }
4661
4662 #[cfg(feature = "deploy")]
4663 #[tokio::test]
4664 async fn test_stream_ref() {
4665 let mut deployment = Deployment::new();
4666
4667 let mut flow = FlowBuilder::new();
4668 let external = flow.external::<()>();
4669 let p1 = flow.process::<()>();
4670
4671 let my_stream = p1.source_iter(q!(1..=5i32));
4673
4674 let stream_ref = my_stream.by_ref();
4675
4676 let out_port = p1
4678 .source_iter(q!([()]))
4679 .map(q!(|_| stream_ref.len() as i32))
4680 .send_bincode_external(&external);
4681
4682 my_stream.for_each(q!(|_| {}));
4684
4685 let nodes = flow
4686 .with_default_optimize()
4687 .with_process(&p1, deployment.Localhost())
4688 .with_external(&external, deployment.Localhost())
4689 .deploy(&mut deployment);
4690
4691 deployment.deploy().await.unwrap();
4692
4693 let mut out_recv = nodes.connect(out_port).await;
4694
4695 deployment.start().await.unwrap();
4696
4697 let result = out_recv.next().await.unwrap();
4698 assert_eq!(result, 5);
4700 }
4701
4702 #[cfg(feature = "deploy")]
4703 #[tokio::test]
4704 async fn test_stream_ref_contents() {
4705 let mut deployment = Deployment::new();
4706
4707 let mut flow = FlowBuilder::new();
4708 let external = flow.external::<()>();
4709 let p1 = flow.process::<()>();
4710
4711 let my_stream = p1.source_iter(q!(1..=3i32));
4713
4714 let stream_ref = my_stream.by_ref();
4715
4716 let out_port = p1
4718 .source_iter(q!([()]))
4719 .map(q!(|_| stream_ref.iter().sum::<i32>()))
4720 .send_bincode_external(&external);
4721
4722 my_stream.for_each(q!(|_| {}));
4723
4724 let nodes = flow
4725 .with_default_optimize()
4726 .with_process(&p1, deployment.Localhost())
4727 .with_external(&external, deployment.Localhost())
4728 .deploy(&mut deployment);
4729
4730 deployment.deploy().await.unwrap();
4731
4732 let mut out_recv = nodes.connect(out_port).await;
4733
4734 deployment.start().await.unwrap();
4735
4736 let result = out_recv.next().await.unwrap();
4737 assert_eq!(result, 6);
4739 }
4740
4741 #[cfg(feature = "deploy")]
4742 #[tokio::test]
4743 async fn test_stream_ref_no_consumer() {
4744 let mut deployment = Deployment::new();
4745
4746 let mut flow = FlowBuilder::new();
4747 let external = flow.external::<()>();
4748 let p1 = flow.process::<()>();
4749
4750 let my_stream = p1.source_iter(q!(1..=4i32));
4752
4753 let stream_ref = my_stream.by_ref();
4754
4755 let out_port = p1
4756 .source_iter(q!([()]))
4757 .map(q!(|_| stream_ref.len() as i32))
4758 .send_bincode_external(&external);
4759
4760 let nodes = flow
4761 .with_default_optimize()
4762 .with_process(&p1, deployment.Localhost())
4763 .with_external(&external, deployment.Localhost())
4764 .deploy(&mut deployment);
4765
4766 deployment.deploy().await.unwrap();
4767
4768 let mut out_recv = nodes.connect(out_port).await;
4769
4770 deployment.start().await.unwrap();
4771
4772 let result = out_recv.next().await.unwrap();
4773 assert_eq!(result, 4);
4774 }
4775
4776 #[cfg(feature = "deploy")]
4777 #[tokio::test]
4778 async fn test_stream_mut() {
4779 let mut deployment = Deployment::new();
4780
4781 let mut flow = FlowBuilder::new();
4782 let external = flow.external::<()>();
4783 let p1 = flow.process::<()>();
4784
4785 let my_stream = p1.source_iter(q!(1..=5i32));
4787
4788 let stream_mut = my_stream.by_mut();
4789
4790 let out_port = p1
4792 .source_iter(q!([()]))
4793 .map(q!(|_| {
4794 stream_mut.retain(|x| *x > 3);
4795 stream_mut.len() as i32
4796 }))
4797 .send_bincode_external(&external);
4798
4799 my_stream.for_each(q!(|_| {}));
4800
4801 let nodes = flow
4802 .with_default_optimize()
4803 .with_process(&p1, deployment.Localhost())
4804 .with_external(&external, deployment.Localhost())
4805 .deploy(&mut deployment);
4806
4807 deployment.deploy().await.unwrap();
4808
4809 let mut out_recv = nodes.connect(out_port).await;
4810
4811 deployment.start().await.unwrap();
4812
4813 let result = out_recv.next().await.unwrap();
4814 assert_eq!(result, 2);
4816 }
4817
4818 #[cfg(feature = "sim")]
4822 #[test]
4823 fn sim_map_with_mut_on_unordered_explores_multiple_states() {
4824 use crate::live_collections::sliced::sliced;
4825 use crate::live_collections::stream::ExactlyOnce;
4826 use crate::properties::manual_proof;
4827
4828 let mut flow = FlowBuilder::new();
4829 let node = flow.process::<()>();
4830
4831 let (trigger_send, trigger) = node.sim_input::<i32, TotalOrder, ExactlyOnce>();
4832
4833 let out_recv = sliced! {
4834 let batch = use(trigger, nondet!());
4835 let counter = batch.location().source_iter(q!(vec![0i32]))
4836 .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4837 let counter_mut = counter.by_mut();
4838 let items = batch.location().source_iter(q!(vec![1i32, 2])).weaken_ordering::<NoOrder>();
4839 items.map(q!(
4840 |x| {
4841 *counter_mut += x;
4842 *counter_mut
4843 },
4844 commutative = manual_proof!()
4845 ))
4846 }
4847 .sim_output();
4848
4849 let count = flow.sim().exhaustive(async || {
4850 trigger_send.send(1);
4851 let _all: Vec<i32> = out_recv.collect_sorted().await;
4852 });
4853
4854 assert_eq!(
4855 count, 2,
4856 "Expected 2 simulation instances due to mut on unordered input, got {}",
4857 count
4858 );
4859 }
4860
4861 #[cfg(feature = "sim")]
4865 #[test]
4866 #[ignore = "observe_nondet not yet supported for top-level bounded inputs (https://github.com/hydro-project/hydro/issues/2950)"]
4867 fn sim_map_with_mut_on_unordered_top_level() {
4868 use crate::properties::manual_proof;
4869
4870 let mut flow = FlowBuilder::new();
4871 let node = flow.process::<()>();
4872
4873 let counter = node
4874 .source_iter(q!(vec![0i32]))
4875 .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4876 let counter_mut = counter.by_mut();
4877
4878 let out_recv = node
4879 .source_iter(q!(vec![1i32, 2]))
4880 .weaken_ordering::<NoOrder>()
4881 .map(q!(
4882 |x| {
4883 *counter_mut += x;
4884 *counter_mut
4885 },
4886 commutative = manual_proof!()
4887 ))
4888 .assume_ordering::<TotalOrder>(nondet!())
4889 .sim_output();
4890
4891 counter.into_stream().for_each(q!(|_| {}));
4892
4893 let count = flow.sim().exhaustive(async || {
4894 let _all: Vec<i32> = out_recv.collect().await;
4895 });
4896
4897 assert_eq!(
4898 count, 2,
4899 "Expected 2 simulation instances due to mut on unordered input, got {}",
4900 count
4901 );
4902 }
4903}