Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16    ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37    AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38    manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58///   ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62    K,
63    V,
64    Loc,
65    Bound: Boundedness = Unbounded,
66    Order: Ordering = TotalOrder,
67    Retry: Retries = ExactlyOnce,
68> {
69    pub(crate) location: Loc,
70    pub(crate) ir_node: RefCell<HydroNode>,
71    pub(crate) flow_state: FlowState,
72
73    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77    fn drop(&mut self) {
78        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81                input: Box::new(ir_node),
82                op_metadata: HydroIrOpMetadata::new(),
83            });
84        }
85    }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89    for KeyedStream<K, V, L, Unbounded, O, R>
90where
91    L: Location<'a>,
92{
93    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94        let new_meta = stream
95            .location
96            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98        KeyedStream {
99            location: stream.location.clone(),
100            flow_state: stream.flow_state.clone(),
101            ir_node: RefCell::new(HydroNode::Cast {
102                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103                metadata: new_meta,
104            }),
105            _phantom: PhantomData,
106        }
107    }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111    for KeyedStream<K, V, L, B, NoOrder, R>
112where
113    L: Location<'a>,
114{
115    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116        stream.weaken_ordering()
117    }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122    L: Location<'a>,
123{
124    fn defer_tick(self) -> Self {
125        KeyedStream::defer_tick(self)
126    }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132    L: Location<'a>,
133{
134    type Location = Tick<L>;
135
136    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137        KeyedStream {
138            flow_state: location.flow_state().clone(),
139            location: location.clone(),
140            ir_node: RefCell::new(HydroNode::CycleSource {
141                cycle_id,
142                metadata: location.new_node_metadata(
143                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144                ),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154    L: Location<'a>,
155{
156    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162
163        self.location
164            .flow_state()
165            .borrow_mut()
166            .push_root(HydroRoot::CycleSink {
167                cycle_id,
168                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169                op_metadata: HydroIrOpMetadata::new(),
170            });
171    }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175    for KeyedStream<K, V, L, B, O, R>
176where
177    L: Location<'a> + NoTick,
178{
179    type Location = L;
180
181    fn create_source(cycle_id: CycleId, location: L) -> Self {
182        KeyedStream {
183            flow_state: location.flow_state().clone(),
184            location: location.clone(),
185            ir_node: RefCell::new(HydroNode::CycleSource {
186                cycle_id,
187                metadata: location
188                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189            }),
190            _phantom: PhantomData,
191        }
192    }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196    for KeyedStream<K, V, L, B, O, R>
197where
198    L: Location<'a> + NoTick,
199{
200    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201        assert_eq!(
202            Location::id(&self.location),
203            expected_location,
204            "locations do not match"
205        );
206        self.location
207            .flow_state()
208            .borrow_mut()
209            .push_root(HydroRoot::CycleSink {
210                cycle_id,
211                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214    }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220    fn clone(&self) -> Self {
221        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223            *self.ir_node.borrow_mut() = HydroNode::Tee {
224                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225                metadata: self.location.new_node_metadata(Self::collection_kind()),
226            };
227        }
228
229        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230            KeyedStream {
231                location: self.location.clone(),
232                flow_state: self.flow_state.clone(),
233                ir_node: HydroNode::Tee {
234                    inner: SharedNode(inner.0.clone()),
235                    metadata: metadata.clone(),
236                }
237                .into(),
238                _phantom: PhantomData,
239            }
240        } else {
241            unreachable!()
242        }
243    }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249    /// Emit the provided element, and keep processing future inputs.
250    Yield(T),
251    /// Emit the provided element as the _final_ element, do not process future inputs.
252    Return(T),
253    /// Do not emit anything, but continue processing future inputs.
254    Continue,
255    /// Do not emit anything, and do not process further inputs.
256    Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260    KeyedStream<K, V, L, B, O, R>
261{
262    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266        let flow_state = location.flow_state().clone();
267        KeyedStream {
268            location,
269            flow_state,
270            ir_node: RefCell::new(ir_node),
271            _phantom: PhantomData,
272        }
273    }
274
275    /// Returns the [`CollectionKind`] corresponding to this type.
276    pub fn collection_kind() -> CollectionKind {
277        CollectionKind::KeyedStream {
278            bound: B::BOUND_KIND,
279            value_order: O::ORDERING_KIND,
280            value_retry: R::RETRIES_KIND,
281            key_type: stageleft::quote_type::<K>().into(),
282            value_type: stageleft::quote_type::<V>().into(),
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed stream is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290
291    /// Turns this [`KeyedStream`] into a [`Stream`] preserving ordering, under the invariant
292    /// assumption that there is at most one key. If this invariant is broken, the program
293    /// may exhibit undefined behavior, so uses must be carefully vetted.
294    pub(crate) fn cast_at_most_one_key(self) -> Stream<(K, V), L, B, O, R> {
295        Stream::new(
296            self.location.clone(),
297            HydroNode::Cast {
298                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
299                metadata: self
300                    .location
301                    .new_node_metadata(Stream::<(K, V), L, B, O, R>::collection_kind()),
302            },
303        )
304    }
305
306    /// Turns this [`KeyedStream`] into a [`KeyedSingleton`], under the invariant assumption that
307    /// there is at most one entry per key. If this invariant is broken, the program may exhibit
308    /// undefined behavior, so uses must be carefully vetted.
309    pub(crate) fn cast_at_most_one_entry_per_key(
310        self,
311    ) -> KeyedSingleton<K, V, L, B::WithBoundedValue> {
312        KeyedSingleton::new(
313            self.location.clone(),
314            HydroNode::Cast {
315                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
316                metadata: self.location.new_node_metadata(KeyedSingleton::<
317                    K,
318                    V,
319                    L,
320                    B::WithBoundedValue,
321                >::collection_kind()),
322            },
323        )
324    }
325
326    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> KeyedStream<K, V, L, B, O2, R> {
327        if O::ORDERING_KIND == O2::ORDERING_KIND {
328            KeyedStream::new(
329                self.location.clone(),
330                self.ir_node.replace(HydroNode::Placeholder),
331            )
332        } else {
333            panic!(
334                "Runtime ordering {:?} did not match requested cast {:?}.",
335                O::ORDERING_KIND,
336                O2::ORDERING_KIND
337            )
338        }
339    }
340
341    /// Explicitly "casts" the keyed stream to a type with a different ordering
342    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
343    /// by the type-system.
344    ///
345    /// # Non-Determinism
346    /// This function is used as an escape hatch, and any mistakes in the
347    /// provided ordering guarantee will propagate into the guarantees
348    /// for the rest of the program.
349    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
350        if O::ORDERING_KIND == O2::ORDERING_KIND {
351            KeyedStream::new(
352                self.location.clone(),
353                self.ir_node.replace(HydroNode::Placeholder),
354            )
355        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
356            // We can always weaken the ordering guarantee
357            KeyedStream::new(
358                self.location.clone(),
359                HydroNode::Cast {
360                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
361                    metadata: self
362                        .location
363                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
364                },
365            )
366        } else {
367            KeyedStream::new(
368                self.location.clone(),
369                HydroNode::ObserveNonDet {
370                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
371                    trusted: false,
372                    metadata: self
373                        .location
374                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
375                },
376            )
377        }
378    }
379
380    fn assume_ordering_trusted<O2: Ordering>(
381        self,
382        _nondet: NonDet,
383    ) -> KeyedStream<K, V, L, B, O2, R> {
384        if O::ORDERING_KIND == O2::ORDERING_KIND {
385            KeyedStream::new(
386                self.location.clone(),
387                self.ir_node.replace(HydroNode::Placeholder),
388            )
389        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
390            // We can always weaken the ordering guarantee
391            KeyedStream::new(
392                self.location.clone(),
393                HydroNode::Cast {
394                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
395                    metadata: self
396                        .location
397                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
398                },
399            )
400        } else {
401            KeyedStream::new(
402                self.location.clone(),
403                HydroNode::ObserveNonDet {
404                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
405                    trusted: true,
406                    metadata: self
407                        .location
408                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
409                },
410            )
411        }
412    }
413
414    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
415    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
416    /// which is always safe because that is the weakest possible guarantee.
417    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
418        self.weaken_ordering::<NoOrder>()
419    }
420
421    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
422    /// enforcing that `O2` is weaker than the input ordering guarantee.
423    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
424        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
425        self.assume_ordering::<O2>(nondet)
426    }
427
428    /// Explicitly "casts" the keyed stream to a type with a different retries
429    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
430    /// be proven by the type-system.
431    ///
432    /// # Non-Determinism
433    /// This function is used as an escape hatch, and any mistakes in the
434    /// provided retries guarantee will propagate into the guarantees
435    /// for the rest of the program.
436    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
437        if R::RETRIES_KIND == R2::RETRIES_KIND {
438            KeyedStream::new(
439                self.location.clone(),
440                self.ir_node.replace(HydroNode::Placeholder),
441            )
442        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
443            // We can always weaken the retries guarantee
444            KeyedStream::new(
445                self.location.clone(),
446                HydroNode::Cast {
447                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
448                    metadata: self
449                        .location
450                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
451                },
452            )
453        } else {
454            KeyedStream::new(
455                self.location.clone(),
456                HydroNode::ObserveNonDet {
457                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
458                    trusted: false,
459                    metadata: self
460                        .location
461                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
462                },
463            )
464        }
465    }
466
467    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
468    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
469    /// which is always safe because that is the weakest possible guarantee.
470    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
471        self.weaken_retries::<AtLeastOnce>()
472    }
473
474    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
475    /// enforcing that `R2` is weaker than the input retries guarantee.
476    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
477        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
478        self.assume_retries::<R2>(nondet)
479    }
480
481    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
482    /// implies that `O == TotalOrder`.
483    pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
484    where
485        O: IsOrdered,
486    {
487        self.assume_ordering(nondet!(/** no-op */))
488    }
489
490    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
491    /// implies that `R == ExactlyOnce`.
492    pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
493    where
494        R: IsExactlyOnce,
495    {
496        self.assume_retries(nondet!(/** no-op */))
497    }
498
499    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
500    /// implies that `B == Bounded`.
501    pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
502    where
503        B: IsBounded,
504    {
505        self.weaken_boundedness()
506    }
507
508    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
509    /// which implies that `B == Bounded`.
510    pub fn weaken_boundedness<B2: Boundedness>(self) -> KeyedStream<K, V, L, B2, O, R> {
511        if B::BOUNDED == B2::BOUNDED {
512            KeyedStream::new(
513                self.location.clone(),
514                self.ir_node.replace(HydroNode::Placeholder),
515            )
516        } else {
517            // We can always weaken the boundedness
518            KeyedStream::new(
519                self.location.clone(),
520                HydroNode::Cast {
521                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
522                    metadata: self
523                        .location
524                        .new_node_metadata(KeyedStream::<K, V, L, B2, O, R>::collection_kind()),
525                },
526            )
527        }
528    }
529
530    /// Flattens the keyed stream into an unordered stream of key-value pairs.
531    ///
532    /// # Example
533    /// ```rust
534    /// # #[cfg(feature = "deploy")] {
535    /// # use hydro_lang::prelude::*;
536    /// # use futures::StreamExt;
537    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
538    /// process
539    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
540    ///     .into_keyed()
541    ///     .entries()
542    /// # }, |mut stream| async move {
543    /// // (1, 2), (1, 3), (2, 4) in any order
544    /// # let mut results = Vec::new();
545    /// # for _ in 0..3 {
546    /// #     results.push(stream.next().await.unwrap());
547    /// # }
548    /// # results.sort();
549    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
550    /// # }));
551    /// # }
552    /// ```
553    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
554        Stream::new(
555            self.location.clone(),
556            HydroNode::Cast {
557                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
558                metadata: self
559                    .location
560                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
561            },
562        )
563    }
564
565    /// Flattens the keyed stream into a totally ordered stream of key-value pairs,
566    /// preserving the order of values within each key group but non-deterministically
567    /// interleaving across keys.
568    ///
569    /// Requires the keyed stream to be totally ordered within each group (`O: IsOrdered`).
570    ///
571    /// # Non-Determinism
572    /// The interleaving of entries across different keys is non-deterministic.
573    /// Within each key, the original order is preserved.
574    pub fn entries_partially_ordered(self, _nondet: NonDet) -> Stream<(K, V), L, B, TotalOrder, R>
575    where
576        O: IsOrdered,
577    {
578        Stream::new(
579            self.location.clone(),
580            HydroNode::ObserveNonDet {
581                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
582                trusted: false,
583                metadata: self
584                    .location
585                    .new_node_metadata(Stream::<(K, V), L, B, TotalOrder, R>::collection_kind()),
586            },
587        )
588    }
589
590    /// Flattens the keyed stream into an unordered stream of only the values.
591    ///
592    /// # Example
593    /// ```rust
594    /// # #[cfg(feature = "deploy")] {
595    /// # use hydro_lang::prelude::*;
596    /// # use futures::StreamExt;
597    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
598    /// process
599    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
600    ///     .into_keyed()
601    ///     .values()
602    /// # }, |mut stream| async move {
603    /// // 2, 3, 4 in any order
604    /// # let mut results = Vec::new();
605    /// # for _ in 0..3 {
606    /// #     results.push(stream.next().await.unwrap());
607    /// # }
608    /// # results.sort();
609    /// # assert_eq!(results, vec![2, 3, 4]);
610    /// # }));
611    /// # }
612    /// ```
613    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
614        self.entries().map(q!(|(_, v)| v))
615    }
616
617    /// Flattens the keyed stream into an unordered stream of just the keys.
618    ///
619    /// # Example
620    /// ```rust
621    /// # #[cfg(feature = "deploy")] {
622    /// # use hydro_lang::prelude::*;
623    /// # use futures::StreamExt;
624    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625    /// # process
626    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
627    /// #     .into_keyed()
628    /// #     .keys()
629    /// # }, |mut stream| async move {
630    /// // 1, 2 in any order
631    /// # let mut results = Vec::new();
632    /// # for _ in 0..2 {
633    /// #     results.push(stream.next().await.unwrap());
634    /// # }
635    /// # results.sort();
636    /// # assert_eq!(results, vec![1, 2]);
637    /// # }));
638    /// # }
639    /// ```
640    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
641    where
642        K: Eq + Hash,
643    {
644        self.entries().map(q!(|(k, _)| k)).unique()
645    }
646
647    /// Transforms each value by invoking `f` on each element, with keys staying the same
648    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
649    ///
650    /// If you do not want to modify the stream and instead only want to view
651    /// each item use [`KeyedStream::inspect`] instead.
652    ///
653    /// # Example
654    /// ```rust
655    /// # #[cfg(feature = "deploy")] {
656    /// # use hydro_lang::prelude::*;
657    /// # use futures::StreamExt;
658    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
659    /// process
660    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
661    ///     .into_keyed()
662    ///     .map(q!(|v| v + 1))
663    /// #   .entries()
664    /// # }, |mut stream| async move {
665    /// // { 1: [3, 4], 2: [5] }
666    /// # let mut results = Vec::new();
667    /// # for _ in 0..3 {
668    /// #     results.push(stream.next().await.unwrap());
669    /// # }
670    /// # results.sort();
671    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
672    /// # }));
673    /// # }
674    /// ```
675    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
676    where
677        F: Fn(V) -> U + 'a,
678    {
679        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
680        let map_f = q!({
681            let orig = f;
682            move |(k, v)| (k, orig(v))
683        })
684        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
685        .into();
686
687        KeyedStream::new(
688            self.location.clone(),
689            HydroNode::Map {
690                f: map_f,
691                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
692                metadata: self
693                    .location
694                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
695            },
696        )
697    }
698
699    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
700    /// re-grouped even they are tuples; instead they will be grouped under the original key.
701    ///
702    /// If you do not want to modify the stream and instead only want to view
703    /// each item use [`KeyedStream::inspect_with_key`] instead.
704    ///
705    /// # Example
706    /// ```rust
707    /// # #[cfg(feature = "deploy")] {
708    /// # use hydro_lang::prelude::*;
709    /// # use futures::StreamExt;
710    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
711    /// process
712    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
713    ///     .into_keyed()
714    ///     .map_with_key(q!(|(k, v)| k + v))
715    /// #   .entries()
716    /// # }, |mut stream| async move {
717    /// // { 1: [3, 4], 2: [6] }
718    /// # let mut results = Vec::new();
719    /// # for _ in 0..3 {
720    /// #     results.push(stream.next().await.unwrap());
721    /// # }
722    /// # results.sort();
723    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
724    /// # }));
725    /// # }
726    /// ```
727    pub fn map_with_key<U, F>(
728        self,
729        f: impl IntoQuotedMut<'a, F, L> + Copy,
730    ) -> KeyedStream<K, U, L, B, O, R>
731    where
732        F: Fn((K, V)) -> U + 'a,
733        K: Clone,
734    {
735        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
736        let map_f = q!({
737            let orig = f;
738            move |(k, v)| {
739                let out = orig((Clone::clone(&k), v));
740                (k, out)
741            }
742        })
743        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
744        .into();
745
746        KeyedStream::new(
747            self.location.clone(),
748            HydroNode::Map {
749                f: map_f,
750                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
751                metadata: self
752                    .location
753                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
754            },
755        )
756    }
757
758    /// Prepends a new value to the key of each element in the stream, producing a new
759    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
760    /// occurs and the elements in each group preserve their original order.
761    ///
762    /// # Example
763    /// ```rust
764    /// # #[cfg(feature = "deploy")] {
765    /// # use hydro_lang::prelude::*;
766    /// # use futures::StreamExt;
767    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
768    /// process
769    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
770    ///     .into_keyed()
771    ///     .prefix_key(q!(|&(k, _)| k % 2))
772    /// #   .entries()
773    /// # }, |mut stream| async move {
774    /// // { (1, 1): [2, 3], (0, 2): [4] }
775    /// # let mut results = Vec::new();
776    /// # for _ in 0..3 {
777    /// #     results.push(stream.next().await.unwrap());
778    /// # }
779    /// # results.sort();
780    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
781    /// # }));
782    /// # }
783    /// ```
784    pub fn prefix_key<K2, F>(
785        self,
786        f: impl IntoQuotedMut<'a, F, L> + Copy,
787    ) -> KeyedStream<(K2, K), V, L, B, O, R>
788    where
789        F: Fn(&(K, V)) -> K2 + 'a,
790    {
791        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
792        let map_f = q!({
793            let orig = f;
794            move |kv| {
795                let out = orig(&kv);
796                ((out, kv.0), kv.1)
797            }
798        })
799        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
800        .into();
801
802        KeyedStream::new(
803            self.location.clone(),
804            HydroNode::Map {
805                f: map_f,
806                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
807                metadata: self
808                    .location
809                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
810            },
811        )
812    }
813
814    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
815    /// `f`, preserving the order of the elements within the group.
816    ///
817    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
818    /// not modify or take ownership of the values. If you need to modify the values while filtering
819    /// use [`KeyedStream::filter_map`] instead.
820    ///
821    /// # Example
822    /// ```rust
823    /// # #[cfg(feature = "deploy")] {
824    /// # use hydro_lang::prelude::*;
825    /// # use futures::StreamExt;
826    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
827    /// process
828    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
829    ///     .into_keyed()
830    ///     .filter(q!(|&x| x > 2))
831    /// #   .entries()
832    /// # }, |mut stream| async move {
833    /// // { 1: [3], 2: [4] }
834    /// # let mut results = Vec::new();
835    /// # for _ in 0..2 {
836    /// #     results.push(stream.next().await.unwrap());
837    /// # }
838    /// # results.sort();
839    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
840    /// # }));
841    /// # }
842    /// ```
843    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
844    where
845        F: Fn(&V) -> bool + 'a,
846    {
847        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
848        let filter_f = q!({
849            let orig = f;
850            move |t: &(_, _)| orig(&t.1)
851        })
852        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
853        .into();
854
855        KeyedStream::new(
856            self.location.clone(),
857            HydroNode::Filter {
858                f: filter_f,
859                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
860                metadata: self.location.new_node_metadata(Self::collection_kind()),
861            },
862        )
863    }
864
865    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
866    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
867    ///
868    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
869    /// not modify or take ownership of the values. If you need to modify the values while filtering
870    /// use [`KeyedStream::filter_map_with_key`] instead.
871    ///
872    /// # Example
873    /// ```rust
874    /// # #[cfg(feature = "deploy")] {
875    /// # use hydro_lang::prelude::*;
876    /// # use futures::StreamExt;
877    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
878    /// process
879    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
880    ///     .into_keyed()
881    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
882    /// #   .entries()
883    /// # }, |mut stream| async move {
884    /// // { 1: [3], 2: [4] }
885    /// # let mut results = Vec::new();
886    /// # for _ in 0..2 {
887    /// #     results.push(stream.next().await.unwrap());
888    /// # }
889    /// # results.sort();
890    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
891    /// # }));
892    /// # }
893    /// ```
894    pub fn filter_with_key<F>(
895        self,
896        f: impl IntoQuotedMut<'a, F, L> + Copy,
897    ) -> KeyedStream<K, V, L, B, O, R>
898    where
899        F: Fn(&(K, V)) -> bool + 'a,
900    {
901        let filter_f = f
902            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
903            .into();
904
905        KeyedStream::new(
906            self.location.clone(),
907            HydroNode::Filter {
908                f: filter_f,
909                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
910                metadata: self.location.new_node_metadata(Self::collection_kind()),
911            },
912        )
913    }
914
915    /// An operator that both filters and maps each value, with keys staying the same.
916    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
917    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
918    ///
919    /// # Example
920    /// ```rust
921    /// # #[cfg(feature = "deploy")] {
922    /// # use hydro_lang::prelude::*;
923    /// # use futures::StreamExt;
924    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
925    /// process
926    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
927    ///     .into_keyed()
928    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
929    /// #   .entries()
930    /// # }, |mut stream| async move {
931    /// // { 1: [2], 2: [4] }
932    /// # let mut results = Vec::new();
933    /// # for _ in 0..2 {
934    /// #     results.push(stream.next().await.unwrap());
935    /// # }
936    /// # results.sort();
937    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
938    /// # }));
939    /// # }
940    /// ```
941    pub fn filter_map<U, F>(
942        self,
943        f: impl IntoQuotedMut<'a, F, L> + Copy,
944    ) -> KeyedStream<K, U, L, B, O, R>
945    where
946        F: Fn(V) -> Option<U> + 'a,
947    {
948        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
949        let filter_map_f = q!({
950            let orig = f;
951            move |(k, v)| orig(v).map(|o| (k, o))
952        })
953        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
954        .into();
955
956        KeyedStream::new(
957            self.location.clone(),
958            HydroNode::FilterMap {
959                f: filter_map_f,
960                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
961                metadata: self
962                    .location
963                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
964            },
965        )
966    }
967
968    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
969    /// re-grouped even they are tuples; instead they will be grouped under the original key.
970    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
971    ///
972    /// # Example
973    /// ```rust
974    /// # #[cfg(feature = "deploy")] {
975    /// # use hydro_lang::prelude::*;
976    /// # use futures::StreamExt;
977    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
978    /// process
979    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
980    ///     .into_keyed()
981    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
982    /// #   .entries()
983    /// # }, |mut stream| async move {
984    /// // { 2: [2] }
985    /// # let mut results = Vec::new();
986    /// # for _ in 0..1 {
987    /// #     results.push(stream.next().await.unwrap());
988    /// # }
989    /// # results.sort();
990    /// # assert_eq!(results, vec![(2, 2)]);
991    /// # }));
992    /// # }
993    /// ```
994    pub fn filter_map_with_key<U, F>(
995        self,
996        f: impl IntoQuotedMut<'a, F, L> + Copy,
997    ) -> KeyedStream<K, U, L, B, O, R>
998    where
999        F: Fn((K, V)) -> Option<U> + 'a,
1000        K: Clone,
1001    {
1002        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1003        let filter_map_f = q!({
1004            let orig = f;
1005            move |(k, v)| {
1006                let out = orig((Clone::clone(&k), v));
1007                out.map(|o| (k, o))
1008            }
1009        })
1010        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1011        .into();
1012
1013        KeyedStream::new(
1014            self.location.clone(),
1015            HydroNode::FilterMap {
1016                f: filter_map_f,
1017                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1018                metadata: self
1019                    .location
1020                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1021            },
1022        )
1023    }
1024
1025    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
1026    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
1027    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
1028    ///
1029    /// # Example
1030    /// ```rust
1031    /// # #[cfg(feature = "deploy")] {
1032    /// # use hydro_lang::prelude::*;
1033    /// # use futures::StreamExt;
1034    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1035    /// let tick = process.tick();
1036    /// let batch = process
1037    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
1038    ///   .into_keyed()
1039    ///   .batch(&tick, nondet!(/** test */));
1040    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
1041    /// batch.cross_singleton(count).all_ticks().entries()
1042    /// # }, |mut stream| async move {
1043    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
1044    /// # let mut results = Vec::new();
1045    /// # for _ in 0..3 {
1046    /// #     results.push(stream.next().await.unwrap());
1047    /// # }
1048    /// # results.sort();
1049    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
1050    /// # }));
1051    /// # }
1052    /// ```
1053    pub fn cross_singleton<O2>(
1054        self,
1055        other: impl Into<Optional<O2, L, Bounded>>,
1056    ) -> KeyedStream<K, (V, O2), L, B, O, R>
1057    where
1058        O2: Clone,
1059    {
1060        let other: Optional<O2, L, Bounded> = other.into();
1061        check_matching_location(&self.location, &other.location);
1062
1063        Stream::new(
1064            self.location.clone(),
1065            HydroNode::CrossSingleton {
1066                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1067                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1068                metadata: self
1069                    .location
1070                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
1071            },
1072        )
1073        .map(q!(|((k, v), o2)| (k, (v, o2))))
1074        .into_keyed()
1075    }
1076
1077    /// For each value `v` in each group, transform `v` using `f` and then treat the
1078    /// result as an [`Iterator`] to produce values one by one within the same group.
1079    /// The implementation for [`Iterator`] for the output type `I` must produce items
1080    /// in a **deterministic** order.
1081    ///
1082    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
1083    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
1084    ///
1085    /// # Example
1086    /// ```rust
1087    /// # #[cfg(feature = "deploy")] {
1088    /// # use hydro_lang::prelude::*;
1089    /// # use futures::StreamExt;
1090    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1091    /// process
1092    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1093    ///     .into_keyed()
1094    ///     .flat_map_ordered(q!(|x| x))
1095    /// #   .entries()
1096    /// # }, |mut stream| async move {
1097    /// // { 1: [2, 3, 4], 2: [5, 6] }
1098    /// # let mut results = Vec::new();
1099    /// # for _ in 0..5 {
1100    /// #     results.push(stream.next().await.unwrap());
1101    /// # }
1102    /// # results.sort();
1103    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1104    /// # }));
1105    /// # }
1106    /// ```
1107    pub fn flat_map_ordered<U, I, F>(
1108        self,
1109        f: impl IntoQuotedMut<'a, F, L> + Copy,
1110    ) -> KeyedStream<K, U, L, B, O, R>
1111    where
1112        I: IntoIterator<Item = U>,
1113        F: Fn(V) -> I + 'a,
1114        K: Clone,
1115    {
1116        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1117        let flat_map_f = q!({
1118            let orig = f;
1119            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1120        })
1121        .splice_fn1_ctx::<(K, V), _>(&self.location)
1122        .into();
1123
1124        KeyedStream::new(
1125            self.location.clone(),
1126            HydroNode::FlatMap {
1127                f: flat_map_f,
1128                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1129                metadata: self
1130                    .location
1131                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1132            },
1133        )
1134    }
1135
1136    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1137    /// for the output type `I` to produce items in any order.
1138    ///
1139    /// # Example
1140    /// ```rust
1141    /// # #[cfg(feature = "deploy")] {
1142    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1143    /// # use futures::StreamExt;
1144    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1145    /// process
1146    ///     .source_iter(q!(vec![
1147    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1148    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1149    ///     ]))
1150    ///     .into_keyed()
1151    ///     .flat_map_unordered(q!(|x| x))
1152    /// #   .entries()
1153    /// # }, |mut stream| async move {
1154    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1155    /// # let mut results = Vec::new();
1156    /// # for _ in 0..4 {
1157    /// #     results.push(stream.next().await.unwrap());
1158    /// # }
1159    /// # results.sort();
1160    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1161    /// # }));
1162    /// # }
1163    /// ```
1164    pub fn flat_map_unordered<U, I, F>(
1165        self,
1166        f: impl IntoQuotedMut<'a, F, L> + Copy,
1167    ) -> KeyedStream<K, U, L, B, NoOrder, R>
1168    where
1169        I: IntoIterator<Item = U>,
1170        F: Fn(V) -> I + 'a,
1171        K: Clone,
1172    {
1173        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1174        let flat_map_f = q!({
1175            let orig = f;
1176            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1177        })
1178        .splice_fn1_ctx::<(K, V), _>(&self.location)
1179        .into();
1180
1181        KeyedStream::new(
1182            self.location.clone(),
1183            HydroNode::FlatMap {
1184                f: flat_map_f,
1185                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1186                metadata: self
1187                    .location
1188                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1189            },
1190        )
1191    }
1192
1193    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1194    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1195    /// items in a **deterministic** order.
1196    ///
1197    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1198    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1199    ///
1200    /// # Example
1201    /// ```rust
1202    /// # #[cfg(feature = "deploy")] {
1203    /// # use hydro_lang::prelude::*;
1204    /// # use futures::StreamExt;
1205    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1206    /// process
1207    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1208    ///     .into_keyed()
1209    ///     .flatten_ordered()
1210    /// #   .entries()
1211    /// # }, |mut stream| async move {
1212    /// // { 1: [2, 3, 4], 2: [5, 6] }
1213    /// # let mut results = Vec::new();
1214    /// # for _ in 0..5 {
1215    /// #     results.push(stream.next().await.unwrap());
1216    /// # }
1217    /// # results.sort();
1218    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1219    /// # }));
1220    /// # }
1221    /// ```
1222    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1223    where
1224        V: IntoIterator<Item = U>,
1225        K: Clone,
1226    {
1227        self.flat_map_ordered(q!(|d| d))
1228    }
1229
1230    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1231    /// for the value type `V` to produce items in any order.
1232    ///
1233    /// # Example
1234    /// ```rust
1235    /// # #[cfg(feature = "deploy")] {
1236    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1237    /// # use futures::StreamExt;
1238    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1239    /// process
1240    ///     .source_iter(q!(vec![
1241    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1242    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1243    ///     ]))
1244    ///     .into_keyed()
1245    ///     .flatten_unordered()
1246    /// #   .entries()
1247    /// # }, |mut stream| async move {
1248    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1249    /// # let mut results = Vec::new();
1250    /// # for _ in 0..4 {
1251    /// #     results.push(stream.next().await.unwrap());
1252    /// # }
1253    /// # results.sort();
1254    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1255    /// # }));
1256    /// # }
1257    /// ```
1258    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1259    where
1260        V: IntoIterator<Item = U>,
1261        K: Clone,
1262    {
1263        self.flat_map_unordered(q!(|d| d))
1264    }
1265
1266    /// An operator which allows you to "inspect" each element of a stream without
1267    /// modifying it. The closure `f` is called on a reference to each value. This is
1268    /// mainly useful for debugging, and should not be used to generate side-effects.
1269    ///
1270    /// # Example
1271    /// ```rust
1272    /// # #[cfg(feature = "deploy")] {
1273    /// # use hydro_lang::prelude::*;
1274    /// # use futures::StreamExt;
1275    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1276    /// process
1277    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1278    ///     .into_keyed()
1279    ///     .inspect(q!(|v| println!("{}", v)))
1280    /// #   .entries()
1281    /// # }, |mut stream| async move {
1282    /// # let mut results = Vec::new();
1283    /// # for _ in 0..3 {
1284    /// #     results.push(stream.next().await.unwrap());
1285    /// # }
1286    /// # results.sort();
1287    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1288    /// # }));
1289    /// # }
1290    /// ```
1291    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1292    where
1293        F: Fn(&V) + 'a,
1294    {
1295        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1296        let inspect_f = q!({
1297            let orig = f;
1298            move |t: &(_, _)| orig(&t.1)
1299        })
1300        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1301        .into();
1302
1303        KeyedStream::new(
1304            self.location.clone(),
1305            HydroNode::Inspect {
1306                f: inspect_f,
1307                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1308                metadata: self.location.new_node_metadata(Self::collection_kind()),
1309            },
1310        )
1311    }
1312
1313    /// An operator which allows you to "inspect" each element of a stream without
1314    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1315    /// mainly useful for debugging, and should not be used to generate side-effects.
1316    ///
1317    /// # Example
1318    /// ```rust
1319    /// # #[cfg(feature = "deploy")] {
1320    /// # use hydro_lang::prelude::*;
1321    /// # use futures::StreamExt;
1322    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1323    /// process
1324    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1325    ///     .into_keyed()
1326    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1327    /// #   .entries()
1328    /// # }, |mut stream| async move {
1329    /// # let mut results = Vec::new();
1330    /// # for _ in 0..3 {
1331    /// #     results.push(stream.next().await.unwrap());
1332    /// # }
1333    /// # results.sort();
1334    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1335    /// # }));
1336    /// # }
1337    /// ```
1338    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1339    where
1340        F: Fn(&(K, V)) + 'a,
1341    {
1342        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1343
1344        KeyedStream::new(
1345            self.location.clone(),
1346            HydroNode::Inspect {
1347                f: inspect_f,
1348                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1349                metadata: self.location.new_node_metadata(Self::collection_kind()),
1350            },
1351        )
1352    }
1353
1354    /// An operator which allows you to "name" a `HydroNode`.
1355    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1356    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1357        {
1358            let mut node = self.ir_node.borrow_mut();
1359            let metadata = node.metadata_mut();
1360            metadata.tag = Some(name.to_owned());
1361        }
1362        self
1363    }
1364
1365    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1366    ///
1367    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1368    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1369    /// early by returning `None`.
1370    ///
1371    /// The function takes a mutable reference to the accumulator and the current element, and returns
1372    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1373    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1374    ///
1375    /// # Example
1376    /// ```rust
1377    /// # #[cfg(feature = "deploy")] {
1378    /// # use hydro_lang::prelude::*;
1379    /// # use futures::StreamExt;
1380    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1381    /// process
1382    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1383    ///     .into_keyed()
1384    ///     .scan(
1385    ///         q!(|| 0),
1386    ///         q!(|acc, x| {
1387    ///             *acc += x;
1388    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1389    ///         }),
1390    ///     )
1391    /// #   .entries()
1392    /// # }, |mut stream| async move {
1393    /// // Output: { 0: [1], 1: [3, 7] }
1394    /// # let mut results = Vec::new();
1395    /// # for _ in 0..3 {
1396    /// #     results.push(stream.next().await.unwrap());
1397    /// # }
1398    /// # results.sort();
1399    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1400    /// # }));
1401    /// # }
1402    /// ```
1403    pub fn scan<A, U, I, F>(
1404        self,
1405        init: impl IntoQuotedMut<'a, I, L> + Copy,
1406        f: impl IntoQuotedMut<'a, F, L> + Copy,
1407    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1408    where
1409        O: IsOrdered,
1410        R: IsExactlyOnce,
1411        K: Clone + Eq + Hash,
1412        I: Fn() -> A + 'a,
1413        F: Fn(&mut A, V) -> Option<U> + 'a,
1414    {
1415        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1416        self.make_totally_ordered().make_exactly_once().generator(
1417            init,
1418            q!({
1419                let orig = f;
1420                move |state, v| {
1421                    if let Some(out) = orig(state, v) {
1422                        Generate::Yield(out)
1423                    } else {
1424                        Generate::Break
1425                    }
1426                }
1427            }),
1428        )
1429    }
1430
1431    /// Iteratively processes the elements in each group using a state machine that can yield
1432    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1433    /// syntax in Rust, without requiring special syntax.
1434    ///
1435    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1436    /// state for each group. The second argument defines the processing logic, taking in a
1437    /// mutable reference to the group's state and the value to be processed. It emits a
1438    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1439    /// should be processed.
1440    ///
1441    /// # Example
1442    /// ```rust
1443    /// # #[cfg(feature = "deploy")] {
1444    /// # use hydro_lang::prelude::*;
1445    /// # use futures::StreamExt;
1446    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1447    /// process
1448    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1449    ///     .into_keyed()
1450    ///     .generator(
1451    ///         q!(|| 0),
1452    ///         q!(|acc, x| {
1453    ///             *acc += x;
1454    ///             if *acc > 100 {
1455    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1456    ///                     "done!".to_owned()
1457    ///                 )
1458    ///             } else if *acc % 2 == 0 {
1459    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1460    ///                     "even".to_owned()
1461    ///                 )
1462    ///             } else {
1463    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1464    ///             }
1465    ///         }),
1466    ///     )
1467    /// #   .entries()
1468    /// # }, |mut stream| async move {
1469    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1470    /// # let mut results = Vec::new();
1471    /// # for _ in 0..3 {
1472    /// #     results.push(stream.next().await.unwrap());
1473    /// # }
1474    /// # results.sort();
1475    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1476    /// # }));
1477    /// # }
1478    /// ```
1479    pub fn generator<A, U, I, F>(
1480        self,
1481        init: impl IntoQuotedMut<'a, I, L> + Copy,
1482        f: impl IntoQuotedMut<'a, F, L> + Copy,
1483    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1484    where
1485        O: IsOrdered,
1486        R: IsExactlyOnce,
1487        K: Clone + Eq + Hash,
1488        I: Fn() -> A + 'a,
1489        F: Fn(&mut A, V) -> Generate<U> + 'a,
1490    {
1491        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1492        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1493
1494        let this = self.make_totally_ordered().make_exactly_once();
1495
1496        let scan_init = q!(|| HashMap::new())
1497            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1498            .into();
1499        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1500            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1501            if let Some(existing_state_value) = existing_state {
1502                match f(existing_state_value, v) {
1503                    Generate::Yield(out) => Some(Some((k, out))),
1504                    Generate::Return(out) => {
1505                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1506                        Some(Some((k, out)))
1507                    }
1508                    Generate::Break => {
1509                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1510                        Some(None)
1511                    }
1512                    Generate::Continue => Some(None),
1513                }
1514            } else {
1515                Some(None)
1516            }
1517        })
1518        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1519        .into();
1520
1521        let scan_node = HydroNode::Scan {
1522            init: scan_init,
1523            acc: scan_f,
1524            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1525            metadata: this.location.new_node_metadata(Stream::<
1526                Option<(K, U)>,
1527                L,
1528                B,
1529                TotalOrder,
1530                ExactlyOnce,
1531            >::collection_kind()),
1532        };
1533
1534        let flatten_f = q!(|d| d)
1535            .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1536            .into();
1537        let flatten_node = HydroNode::FlatMap {
1538            f: flatten_f,
1539            input: Box::new(scan_node),
1540            metadata: this.location.new_node_metadata(KeyedStream::<
1541                K,
1542                U,
1543                L,
1544                B,
1545                TotalOrder,
1546                ExactlyOnce,
1547            >::collection_kind()),
1548        };
1549
1550        KeyedStream::new(this.location.clone(), flatten_node)
1551    }
1552
1553    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1554    /// in-order across the values in each group. But the aggregation function returns a boolean,
1555    /// which when true indicates that the aggregated result is complete and can be released to
1556    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1557    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1558    /// normal stream elements.
1559    ///
1560    /// # Example
1561    /// ```rust
1562    /// # #[cfg(feature = "deploy")] {
1563    /// # use hydro_lang::prelude::*;
1564    /// # use futures::StreamExt;
1565    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1566    /// process
1567    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1568    ///     .into_keyed()
1569    ///     .fold_early_stop(
1570    ///         q!(|| 0),
1571    ///         q!(|acc, x| {
1572    ///             *acc += x;
1573    ///             x % 2 == 0
1574    ///         }),
1575    ///     )
1576    /// #   .entries()
1577    /// # }, |mut stream| async move {
1578    /// // Output: { 0: 2, 1: 9 }
1579    /// # let mut results = Vec::new();
1580    /// # for _ in 0..2 {
1581    /// #     results.push(stream.next().await.unwrap());
1582    /// # }
1583    /// # results.sort();
1584    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1585    /// # }));
1586    /// # }
1587    /// ```
1588    pub fn fold_early_stop<A, I, F>(
1589        self,
1590        init: impl IntoQuotedMut<'a, I, L> + Copy,
1591        f: impl IntoQuotedMut<'a, F, L> + Copy,
1592    ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1593    where
1594        O: IsOrdered,
1595        R: IsExactlyOnce,
1596        K: Clone + Eq + Hash,
1597        I: Fn() -> A + 'a,
1598        F: Fn(&mut A, V) -> bool + 'a,
1599    {
1600        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1601        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1602        let out_without_bound_cast = self.generator(
1603            q!(move || Some(init())),
1604            q!(move |key_state, v| {
1605                if let Some(key_state_value) = key_state.as_mut() {
1606                    if f(key_state_value, v) {
1607                        Generate::Return(key_state.take().unwrap())
1608                    } else {
1609                        Generate::Continue
1610                    }
1611                } else {
1612                    unreachable!()
1613                }
1614            }),
1615        );
1616
1617        // SAFETY: The generator will only ever return at most one value per key, since once it
1618        // returns a value for a key it will never process any more values for that key.
1619        out_without_bound_cast.cast_at_most_one_entry_per_key()
1620    }
1621
1622    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1623    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1624    /// otherwise the first element would be non-deterministic.
1625    ///
1626    /// # Example
1627    /// ```rust
1628    /// # #[cfg(feature = "deploy")] {
1629    /// # use hydro_lang::prelude::*;
1630    /// # use futures::StreamExt;
1631    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1632    /// process
1633    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1634    ///     .into_keyed()
1635    ///     .first()
1636    /// #   .entries()
1637    /// # }, |mut stream| async move {
1638    /// // Output: { 0: 2, 1: 3 }
1639    /// # let mut results = Vec::new();
1640    /// # for _ in 0..2 {
1641    /// #     results.push(stream.next().await.unwrap());
1642    /// # }
1643    /// # results.sort();
1644    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1645    /// # }));
1646    /// # }
1647    /// ```
1648    pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1649    where
1650        O: IsOrdered,
1651        R: IsExactlyOnce,
1652        K: Clone + Eq + Hash,
1653    {
1654        self.fold_early_stop(
1655            q!(|| None),
1656            q!(|acc, v| {
1657                *acc = Some(v);
1658                true
1659            }),
1660        )
1661        .map(q!(|v| v.unwrap()))
1662    }
1663
1664    /// Returns a keyed stream containing at most the first `n` values per key,
1665    /// preserving the original order within each group. Similar to SQL `LIMIT`
1666    /// applied per group.
1667    ///
1668    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1669    /// retries, since the result depends on the order and cardinality of elements
1670    /// within each group.
1671    ///
1672    /// # Example
1673    /// ```rust
1674    /// # #[cfg(feature = "deploy")] {
1675    /// # use hydro_lang::prelude::*;
1676    /// # use futures::StreamExt;
1677    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1678    /// process
1679    ///     .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1680    ///     .into_keyed()
1681    ///     .limit(q!(2))
1682    /// #   .entries()
1683    /// # }, |mut stream| async move {
1684    /// // { 1: [10, 20], 2: [40, 50] }
1685    /// # let mut results = Vec::new();
1686    /// # for _ in 0..4 {
1687    /// #     results.push(stream.next().await.unwrap());
1688    /// # }
1689    /// # results.sort();
1690    /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1691    /// # }));
1692    /// # }
1693    /// ```
1694    pub fn limit(
1695        self,
1696        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1697    ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1698    where
1699        O: IsOrdered,
1700        R: IsExactlyOnce,
1701        K: Clone + Eq + Hash,
1702    {
1703        self.generator(
1704            q!(|| 0usize),
1705            q!(move |count, item| {
1706                if *count == n {
1707                    Generate::Break
1708                } else {
1709                    *count += 1;
1710                    if *count == n {
1711                        Generate::Return(item)
1712                    } else {
1713                        Generate::Yield(item)
1714                    }
1715                }
1716            }),
1717        )
1718    }
1719
1720    /// Assigns a zero-based index to each value within each key group, emitting
1721    /// `(K, (index, V))` tuples with per-key sequential indices.
1722    ///
1723    /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1724    /// This is a streaming operator that processes elements as they arrive.
1725    ///
1726    /// # Example
1727    /// ```rust
1728    /// # #[cfg(feature = "deploy")] {
1729    /// # use hydro_lang::prelude::*;
1730    /// # use futures::StreamExt;
1731    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1732    /// process
1733    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1734    ///     .into_keyed()
1735    ///     .enumerate()
1736    /// # .entries()
1737    /// # }, |mut stream| async move {
1738    /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1739    /// # let mut results = Vec::new();
1740    /// # for _ in 0..3 {
1741    /// #     results.push(stream.next().await.unwrap());
1742    /// # }
1743    /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1744    /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1745    /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1746    /// # assert_eq!(key2, vec![(0, 20)]);
1747    /// # }));
1748    /// # }
1749    /// ```
1750    pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1751    where
1752        O: IsOrdered,
1753        R: IsExactlyOnce,
1754        K: Eq + Hash + Clone,
1755    {
1756        self.scan(
1757            q!(|| 0),
1758            q!(|acc, next| {
1759                let curr = *acc;
1760                *acc += 1;
1761                Some((curr, next))
1762            }),
1763        )
1764    }
1765
1766    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1767    ///
1768    /// # Example
1769    /// ```rust
1770    /// # #[cfg(feature = "deploy")] {
1771    /// # use hydro_lang::prelude::*;
1772    /// # use futures::StreamExt;
1773    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1774    /// let tick = process.tick();
1775    /// let numbers = process
1776    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1777    ///     .into_keyed();
1778    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1779    /// batch
1780    ///     .value_counts()
1781    ///     .entries()
1782    ///     .all_ticks()
1783    /// # }, |mut stream| async move {
1784    /// // (1, 3), (2, 2)
1785    /// # let mut results = Vec::new();
1786    /// # for _ in 0..2 {
1787    /// #     results.push(stream.next().await.unwrap());
1788    /// # }
1789    /// # results.sort();
1790    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1791    /// # }));
1792    /// # }
1793    /// ```
1794    pub fn value_counts(
1795        self,
1796    ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1797    where
1798        R: IsExactlyOnce,
1799        K: Eq + Hash,
1800    {
1801        self.make_exactly_once()
1802            .assume_ordering_trusted(
1803                nondet!(/** ordering within each group affects neither result nor intermediates */),
1804            )
1805            .fold(
1806                q!(|| 0),
1807                q!(
1808                    |acc, _| *acc += 1,
1809                    monotone = manual_proof!(/** += 1 is monotonic */)
1810                ),
1811            )
1812    }
1813
1814    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1815    /// group via the `comb` closure.
1816    ///
1817    /// Depending on the input stream guarantees, the closure may need to be commutative
1818    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1819    ///
1820    /// If the input and output value types are the same and do not require initialization then use
1821    /// [`KeyedStream::reduce`].
1822    ///
1823    /// # Example
1824    /// ```rust
1825    /// # #[cfg(feature = "deploy")] {
1826    /// # use hydro_lang::prelude::*;
1827    /// # use futures::StreamExt;
1828    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1829    /// let tick = process.tick();
1830    /// let numbers = process
1831    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1832    ///     .into_keyed();
1833    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1834    /// batch
1835    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1836    ///     .entries()
1837    ///     .all_ticks()
1838    /// # }, |mut stream| async move {
1839    /// // (1, false), (2, true)
1840    /// # let mut results = Vec::new();
1841    /// # for _ in 0..2 {
1842    /// #     results.push(stream.next().await.unwrap());
1843    /// # }
1844    /// # results.sort();
1845    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1846    /// # }));
1847    /// # }
1848    /// ```
1849    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1850        self,
1851        init: impl IntoQuotedMut<'a, I, L>,
1852        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1853    ) -> KeyedSingleton<K, A, L, B2>
1854    where
1855        K: Eq + Hash,
1856        C: ValidCommutativityFor<O>,
1857        Idemp: ValidIdempotenceFor<R>,
1858        B: ApplyMonotoneKeyedStream<M, B2>,
1859    {
1860        let init = init.splice_fn0_ctx(&self.location).into();
1861        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1862        proof.register_proof(&comb);
1863
1864        let ordered = self
1865            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1866            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1867
1868        KeyedSingleton::new(
1869            ordered.location.clone(),
1870            HydroNode::FoldKeyed {
1871                init,
1872                acc: comb.into(),
1873                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1874                metadata: ordered
1875                    .location
1876                    .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
1877            },
1878        )
1879    }
1880
1881    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1882    /// group via the `comb` closure.
1883    ///
1884    /// Depending on the input stream guarantees, the closure may need to be commutative
1885    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1886    ///
1887    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1888    ///
1889    /// # Example
1890    /// ```rust
1891    /// # #[cfg(feature = "deploy")] {
1892    /// # use hydro_lang::prelude::*;
1893    /// # use futures::StreamExt;
1894    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1895    /// let tick = process.tick();
1896    /// let numbers = process
1897    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1898    ///     .into_keyed();
1899    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1900    /// batch
1901    ///     .reduce(q!(|acc, x| *acc |= x))
1902    ///     .entries()
1903    ///     .all_ticks()
1904    /// # }, |mut stream| async move {
1905    /// // (1, false), (2, true)
1906    /// # let mut results = Vec::new();
1907    /// # for _ in 0..2 {
1908    /// #     results.push(stream.next().await.unwrap());
1909    /// # }
1910    /// # results.sort();
1911    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1912    /// # }));
1913    /// # }
1914    /// ```
1915    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1916        self,
1917        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1918    ) -> KeyedSingleton<K, V, L, B>
1919    where
1920        K: Eq + Hash,
1921        C: ValidCommutativityFor<O>,
1922        Idemp: ValidIdempotenceFor<R>,
1923    {
1924        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1925        proof.register_proof(&f);
1926
1927        let ordered = self
1928            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1929            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1930
1931        KeyedSingleton::new(
1932            ordered.location.clone(),
1933            HydroNode::ReduceKeyed {
1934                f: f.into(),
1935                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1936                metadata: ordered
1937                    .location
1938                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1939            },
1940        )
1941    }
1942
1943    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1944    /// are automatically deleted.
1945    ///
1946    /// Depending on the input stream guarantees, the closure may need to be commutative
1947    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1948    ///
1949    /// # Example
1950    /// ```rust
1951    /// # #[cfg(feature = "deploy")] {
1952    /// # use hydro_lang::prelude::*;
1953    /// # use futures::StreamExt;
1954    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1955    /// let tick = process.tick();
1956    /// let watermark = tick.singleton(q!(2));
1957    /// let numbers = process
1958    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1959    ///     .into_keyed();
1960    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1961    /// batch
1962    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1963    ///     .entries()
1964    ///     .all_ticks()
1965    /// # }, |mut stream| async move {
1966    /// // (2, true)
1967    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1968    /// # }));
1969    /// # }
1970    /// ```
1971    pub fn reduce_watermark<O2, F, C, Idemp>(
1972        self,
1973        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1974        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1975    ) -> KeyedSingleton<K, V, L, B>
1976    where
1977        K: Eq + Hash,
1978        O2: Clone,
1979        F: Fn(&mut V, V) + 'a,
1980        C: ValidCommutativityFor<O>,
1981        Idemp: ValidIdempotenceFor<R>,
1982    {
1983        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1984        check_matching_location(&self.location.root(), other.location.outer());
1985        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1986        proof.register_proof(&f);
1987
1988        let ordered = self
1989            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1990            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1991
1992        KeyedSingleton::new(
1993            ordered.location.clone(),
1994            HydroNode::ReduceKeyedWatermark {
1995                f: f.into(),
1996                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1997                watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1998                metadata: ordered
1999                    .location
2000                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2001            },
2002        )
2003    }
2004
2005    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2006    /// whose keys are not in the bounded stream.
2007    ///
2008    /// # Example
2009    /// ```rust
2010    /// # #[cfg(feature = "deploy")] {
2011    /// # use hydro_lang::prelude::*;
2012    /// # use futures::StreamExt;
2013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2014    /// let tick = process.tick();
2015    /// let keyed_stream = process
2016    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2017    ///     .batch(&tick, nondet!(/** test */))
2018    ///     .into_keyed();
2019    /// let keys_to_remove = process
2020    ///     .source_iter(q!(vec![1, 2]))
2021    ///     .batch(&tick, nondet!(/** test */));
2022    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2023    /// #   .entries()
2024    /// # }, |mut stream| async move {
2025    /// // { 3: ['c'], 4: ['d'] }
2026    /// # let mut results = Vec::new();
2027    /// # for _ in 0..2 {
2028    /// #     results.push(stream.next().await.unwrap());
2029    /// # }
2030    /// # results.sort();
2031    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
2032    /// # }));
2033    /// # }
2034    /// ```
2035    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2036        self,
2037        other: Stream<K, L, Bounded, O2, R2>,
2038    ) -> Self
2039    where
2040        K: Eq + Hash,
2041    {
2042        check_matching_location(&self.location, &other.location);
2043
2044        KeyedStream::new(
2045            self.location.clone(),
2046            HydroNode::AntiJoin {
2047                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2048                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2049                metadata: self.location.new_node_metadata(Self::collection_kind()),
2050            },
2051        )
2052    }
2053
2054    /// Emit a keyed stream containing keys shared between two keyed streams,
2055    /// where each value in the output keyed stream is a tuple of
2056    /// (self's value, other's value).
2057    /// If there are multiple values for the same key, this performs a cross product
2058    /// for each matching key.
2059    ///
2060    /// # Example
2061    /// ```rust
2062    /// # #[cfg(feature = "deploy")] {
2063    /// # use hydro_lang::prelude::*;
2064    /// # use futures::StreamExt;
2065    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2066    /// let tick = process.tick();
2067    /// let keyed_data = process
2068    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2069    ///     .into_keyed()
2070    ///     .batch(&tick, nondet!(/** test */));
2071    /// let other_data = process
2072    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
2073    ///     .into_keyed()
2074    ///     .batch(&tick, nondet!(/** test */));
2075    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
2076    /// # }, |mut stream| async move {
2077    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
2078    /// # let mut results = vec![];
2079    /// # for _ in 0..4 {
2080    /// #     results.push(stream.next().await.unwrap());
2081    /// # }
2082    /// # results.sort();
2083    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2084    /// # }));
2085    /// # }
2086    /// ```
2087    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2088    pub fn join_keyed_stream<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2089        self,
2090        other: KeyedStream<K, V2, L, B2, O2, R2>,
2091    ) -> KeyedStream<
2092        K,
2093        (V, V2),
2094        L,
2095        B,
2096        B2::PreserveOrderIfBounded<NoOrder>,
2097        <R as MinRetries<R2>>::Min,
2098    >
2099    where
2100        K: Eq + Hash + Clone,
2101        R: MinRetries<R2>,
2102        V: Clone,
2103        V2: Clone,
2104    {
2105        self.entries().join(other.entries()).into_keyed()
2106    }
2107
2108    /// Deduplicates values within each key group, emitting each unique value per key
2109    /// exactly once.
2110    ///
2111    /// # Example
2112    /// ```rust
2113    /// # #[cfg(feature = "deploy")] {
2114    /// # use hydro_lang::prelude::*;
2115    /// # use futures::StreamExt;
2116    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2117    /// process
2118    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2119    ///     .into_keyed()
2120    ///     .unique()
2121    /// # .entries()
2122    /// # }, |mut stream| async move {
2123    /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2124    /// # let mut results = Vec::new();
2125    /// # for _ in 0..4 {
2126    /// #     results.push(stream.next().await.unwrap());
2127    /// # }
2128    /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2129    /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2130    /// # key1.sort();
2131    /// # key2.sort();
2132    /// # assert_eq!(key1, vec![10, 20]);
2133    /// # assert_eq!(key2, vec![20, 30]);
2134    /// # }));
2135    /// # }
2136    /// ```
2137    pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2138    where
2139        K: Eq + Hash + Clone,
2140        V: Eq + Hash + Clone,
2141    {
2142        self.entries().unique().into_keyed()
2143    }
2144
2145    /// Sorts the values within each key group in ascending order.
2146    ///
2147    /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2148    /// each group. This operator will block until all elements in the input stream
2149    /// are available, so it requires the input stream to be [`Bounded`].
2150    ///
2151    /// # Example
2152    /// ```rust
2153    /// # #[cfg(feature = "deploy")] {
2154    /// # use hydro_lang::prelude::*;
2155    /// # use futures::StreamExt;
2156    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2157    /// let tick = process.tick();
2158    /// let numbers = process
2159    ///     .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2160    ///     .into_keyed();
2161    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2162    /// batch.sort().all_ticks()
2163    /// # .entries()
2164    /// # }, |mut stream| async move {
2165    /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2166    /// # let mut results = Vec::new();
2167    /// # for _ in 0..4 {
2168    /// #     results.push(stream.next().await.unwrap());
2169    /// # }
2170    /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2171    /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2172    /// # assert_eq!(key1_vals, vec![1, 3]);
2173    /// # assert_eq!(key2_vals, vec![1, 2]);
2174    /// # }));
2175    /// # }
2176    /// ```
2177    pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2178    where
2179        B: IsBounded,
2180        K: Ord,
2181        V: Ord,
2182    {
2183        self.entries().sort().into_keyed()
2184    }
2185
2186    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2187    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2188    /// is only present in one of the inputs, its values are passed through as-is). The output has
2189    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2190    ///
2191    /// Currently, both input streams must be [`Bounded`]. This operator will block
2192    /// on the first stream until all its elements are available. In a future version,
2193    /// we will relax the requirement on the `other` stream.
2194    ///
2195    /// # Example
2196    /// ```rust
2197    /// # #[cfg(feature = "deploy")] {
2198    /// # use hydro_lang::prelude::*;
2199    /// # use futures::StreamExt;
2200    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2201    /// let tick = process.tick();
2202    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2203    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2204    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2205    /// # .entries()
2206    /// # }, |mut stream| async move {
2207    /// // { 0: [2, 1], 1: [4, 3] }
2208    /// # let mut results = Vec::new();
2209    /// # for _ in 0..4 {
2210    /// #     results.push(stream.next().await.unwrap());
2211    /// # }
2212    /// # results.sort();
2213    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2214    /// # }));
2215    /// # }
2216    /// ```
2217    pub fn chain<O2: Ordering, R2: Retries>(
2218        self,
2219        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2220    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2221    where
2222        B: IsBounded,
2223        O: MinOrder<O2>,
2224        R: MinRetries<R2>,
2225    {
2226        let this = self.make_bounded();
2227        check_matching_location(&this.location, &other.location);
2228
2229        KeyedStream::new(
2230            this.location.clone(),
2231            HydroNode::Chain {
2232                first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2233                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2234                metadata: this.location.new_node_metadata(KeyedStream::<
2235                    K,
2236                    V,
2237                    L,
2238                    Bounded,
2239                    <O as MinOrder<O2>>::Min,
2240                    <R as MinRetries<R2>>::Min,
2241                >::collection_kind()),
2242            },
2243        )
2244    }
2245
2246    /// Emit a keyed stream containing keys shared between the keyed stream and the
2247    /// keyed singleton, where each value in the output keyed stream is a tuple of
2248    /// (the keyed stream's value, the keyed singleton's value).
2249    ///
2250    /// # Example
2251    /// ```rust
2252    /// # #[cfg(feature = "deploy")] {
2253    /// # use hydro_lang::prelude::*;
2254    /// # use futures::StreamExt;
2255    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2256    /// let tick = process.tick();
2257    /// let keyed_data = process
2258    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2259    ///     .into_keyed()
2260    ///     .batch(&tick, nondet!(/** test */));
2261    /// let singleton_data = process
2262    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
2263    ///     .into_keyed()
2264    ///     .batch(&tick, nondet!(/** test */))
2265    ///     .first();
2266    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2267    /// # }, |mut stream| async move {
2268    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2269    /// # let mut results = vec![];
2270    /// # for _ in 0..3 {
2271    /// #     results.push(stream.next().await.unwrap());
2272    /// # }
2273    /// # results.sort();
2274    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2275    /// # }));
2276    /// # }
2277    /// ```
2278    pub fn join_keyed_singleton<V2: Clone, B2: IsBounded>(
2279        self,
2280        other: KeyedSingleton<K, V2, L, B2>,
2281    ) -> KeyedStream<K, (V, V2), L, B, O, R>
2282    where
2283        K: Eq + Hash + Clone,
2284        V: Clone,
2285    {
2286        let ir_node = if B2::BOUNDED {
2287            HydroNode::JoinHalf {
2288                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2289                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2290                metadata: self
2291                    .location
2292                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2293            }
2294        } else {
2295            HydroNode::Join {
2296                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2297                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2298                metadata: self
2299                    .location
2300                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2301            }
2302        };
2303
2304        KeyedStream::new(self.location.clone(), ir_node)
2305    }
2306
2307    /// Gets the values associated with a specific key from the keyed stream.
2308    /// Returns an empty stream if the key is `None` or there are no associated values.
2309    ///
2310    /// # Example
2311    /// ```rust
2312    /// # #[cfg(feature = "deploy")] {
2313    /// # use hydro_lang::prelude::*;
2314    /// # use futures::StreamExt;
2315    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2316    /// let tick = process.tick();
2317    /// let keyed_data = process
2318    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2319    ///     .into_keyed()
2320    ///     .batch(&tick, nondet!(/** test */));
2321    /// let key = tick.singleton(q!(1));
2322    /// keyed_data.get(key).all_ticks()
2323    /// # }, |mut stream| async move {
2324    /// // 10, 11
2325    /// # let mut results = vec![];
2326    /// # for _ in 0..2 {
2327    /// #     results.push(stream.next().await.unwrap());
2328    /// # }
2329    /// # results.sort();
2330    /// # assert_eq!(results, vec![10, 11]);
2331    /// # }));
2332    /// # }
2333    /// ```
2334    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, B, O, R>
2335    where
2336        K: Eq + Hash + Clone,
2337        V: Clone,
2338    {
2339        let joined =
2340            self.join_keyed_singleton(key.into().map(q!(|k| (k, ()))).into_keyed_singleton());
2341
2342        if O::ORDERING_KIND == StreamOrder::TotalOrder {
2343            joined
2344                .use_ordering_type::<TotalOrder>()
2345                .cast_at_most_one_key()
2346                .map(q!(|(_, (v, _))| v))
2347                .weaken_ordering()
2348        } else {
2349            joined.values().map(q!(|(v, _)| v)).use_ordering_type()
2350        }
2351    }
2352
2353    /// For each value in `self`, find the matching key in `lookup`.
2354    /// The output is a keyed stream with the key from `self`, and a value
2355    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2356    /// If the key is not present in `lookup`, the option will be [`None`].
2357    ///
2358    /// # Example
2359    /// ```rust
2360    /// # #[cfg(feature = "deploy")] {
2361    /// # use hydro_lang::prelude::*;
2362    /// # use futures::StreamExt;
2363    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2364    /// # let tick = process.tick();
2365    /// let requests = // { 1: [10, 11], 2: 20 }
2366    /// # process
2367    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2368    /// #     .into_keyed()
2369    /// #     .batch(&tick, nondet!(/** test */));
2370    /// let other_data = // { 10: 100, 11: 110 }
2371    /// # process
2372    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2373    /// #     .into_keyed()
2374    /// #     .batch(&tick, nondet!(/** test */))
2375    /// #     .first();
2376    /// requests.lookup_keyed_singleton(other_data)
2377    /// # .entries().all_ticks()
2378    /// # }, |mut stream| async move {
2379    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2380    /// # let mut results = vec![];
2381    /// # for _ in 0..3 {
2382    /// #     results.push(stream.next().await.unwrap());
2383    /// # }
2384    /// # results.sort();
2385    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2386    /// # }));
2387    /// # }
2388    /// ```
2389    pub fn lookup_keyed_singleton<V2>(
2390        self,
2391        lookup: KeyedSingleton<V, V2, L, Bounded>,
2392    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2393    where
2394        B: IsBounded,
2395        K: Eq + Hash + Clone,
2396        V: Eq + Hash + Clone,
2397        V2: Clone,
2398    {
2399        self.lookup_keyed_stream(
2400            lookup
2401                .into_keyed_stream()
2402                .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2403        )
2404    }
2405
2406    /// For each value in `self`, find the matching key in `lookup`.
2407    /// The output is a keyed stream with the key from `self`, and a value
2408    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2409    /// If the key is not present in `lookup`, the option will be [`None`].
2410    ///
2411    /// # Example
2412    /// ```rust
2413    /// # #[cfg(feature = "deploy")] {
2414    /// # use hydro_lang::prelude::*;
2415    /// # use futures::StreamExt;
2416    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2417    /// # let tick = process.tick();
2418    /// let requests = // { 1: [10, 11], 2: 20 }
2419    /// # process
2420    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2421    /// #     .into_keyed()
2422    /// #     .batch(&tick, nondet!(/** test */));
2423    /// let other_data = // { 10: [100, 101], 11: 110 }
2424    /// # process
2425    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2426    /// #     .into_keyed()
2427    /// #     .batch(&tick, nondet!(/** test */));
2428    /// requests.lookup_keyed_stream(other_data)
2429    /// # .entries().all_ticks()
2430    /// # }, |mut stream| async move {
2431    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2432    /// # let mut results = vec![];
2433    /// # for _ in 0..4 {
2434    /// #     results.push(stream.next().await.unwrap());
2435    /// # }
2436    /// # results.sort();
2437    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2438    /// # }));
2439    /// # }
2440    /// ```
2441    #[expect(clippy::type_complexity, reason = "retries propagation")]
2442    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2443        self,
2444        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2445    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2446    where
2447        B: IsBounded,
2448        K: Eq + Hash + Clone,
2449        V: Eq + Hash + Clone,
2450        V2: Clone,
2451        R: MinRetries<R2>,
2452    {
2453        let inverted = self
2454            .make_bounded()
2455            .entries()
2456            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2457            .into_keyed();
2458        let found = inverted
2459            .clone()
2460            .join_keyed_stream(lookup.clone())
2461            .entries()
2462            .map(q!(|(lookup_value, (key, value))| (
2463                key,
2464                (lookup_value, Some(value))
2465            )))
2466            .into_keyed();
2467        let not_found = inverted
2468            .filter_key_not_in(lookup.keys())
2469            .entries()
2470            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2471            .into_keyed();
2472
2473        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2474    }
2475
2476    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2477    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2478    ///
2479    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2480    /// processed before an acknowledgement is emitted.
2481    pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2482        let id = self.location.flow_state().borrow_mut().next_clock_id();
2483        let out_location = Atomic {
2484            tick: Tick {
2485                id,
2486                l: self.location.clone(),
2487            },
2488        };
2489        KeyedStream::new(
2490            out_location.clone(),
2491            HydroNode::BeginAtomic {
2492                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2493                metadata: out_location
2494                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2495            },
2496        )
2497    }
2498
2499    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2500    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2501    /// the order of the input.
2502    ///
2503    /// # Non-Determinism
2504    /// The batch boundaries are non-deterministic and may change across executions.
2505    pub fn batch(
2506        self,
2507        tick: &Tick<L>,
2508        nondet: NonDet,
2509    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2510        let _ = nondet;
2511        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2512        KeyedStream::new(
2513            tick.clone(),
2514            HydroNode::Batch {
2515                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2516                metadata: tick.new_node_metadata(
2517                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2518                ),
2519            },
2520        )
2521    }
2522}
2523
2524impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2525    KeyedStream<(K1, K2), V, L, B, O, R>
2526{
2527    /// Produces a new keyed stream by dropping the first element of the compound key.
2528    ///
2529    /// Because multiple keys may share the same suffix, this operation results in re-grouping
2530    /// of the values under the new keys. The values across groups with the same new key
2531    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2532    ///
2533    /// # Example
2534    /// ```rust
2535    /// # #[cfg(feature = "deploy")] {
2536    /// # use hydro_lang::prelude::*;
2537    /// # use futures::StreamExt;
2538    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2539    /// process
2540    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2541    ///     .into_keyed()
2542    ///     .drop_key_prefix()
2543    /// #   .entries()
2544    /// # }, |mut stream| async move {
2545    /// // { 10: [2, 3], 20: [4] }
2546    /// # let mut results = Vec::new();
2547    /// # for _ in 0..3 {
2548    /// #     results.push(stream.next().await.unwrap());
2549    /// # }
2550    /// # results.sort();
2551    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2552    /// # }));
2553    /// # }
2554    /// ```
2555    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2556        self.entries()
2557            .map(q!(|((_k1, k2), v)| (k2, v)))
2558            .into_keyed()
2559    }
2560}
2561
2562impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2563    KeyedStream<K, V, L, Unbounded, O, R>
2564{
2565    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2566    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2567    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2568    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2569    ///
2570    /// Currently, both input streams must be [`Unbounded`].
2571    ///
2572    /// # Example
2573    /// ```rust
2574    /// # #[cfg(feature = "deploy")] {
2575    /// # use hydro_lang::prelude::*;
2576    /// # use futures::StreamExt;
2577    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2578    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2579    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2580    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2581    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2582    /// numbers1.merge_unordered(numbers2)
2583    /// #   .entries()
2584    /// # }, |mut stream| async move {
2585    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2586    /// # let mut results = Vec::new();
2587    /// # for _ in 0..4 {
2588    /// #     results.push(stream.next().await.unwrap());
2589    /// # }
2590    /// # results.sort();
2591    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2592    /// # }));
2593    /// # }
2594    /// ```
2595    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2596        self,
2597        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2598    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2599    where
2600        R: MinRetries<R2>,
2601    {
2602        KeyedStream::new(
2603            self.location.clone(),
2604            HydroNode::Chain {
2605                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2606                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2607                metadata: self.location.new_node_metadata(KeyedStream::<
2608                    K,
2609                    V,
2610                    L,
2611                    Unbounded,
2612                    NoOrder,
2613                    <R as MinRetries<R2>>::Min,
2614                >::collection_kind()),
2615            },
2616        )
2617    }
2618
2619    /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2620    #[deprecated(note = "use `merge_unordered` instead")]
2621    pub fn interleave<O2: Ordering, R2: Retries>(
2622        self,
2623        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2624    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2625    where
2626        R: MinRetries<R2>,
2627    {
2628        self.merge_unordered(other)
2629    }
2630}
2631
2632impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2633where
2634    L: Location<'a> + NoTick,
2635{
2636    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2637    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2638    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2639    /// used to create the atomic section.
2640    ///
2641    /// # Non-Determinism
2642    /// The batch boundaries are non-deterministic and may change across executions.
2643    pub fn batch_atomic(
2644        self,
2645        tick: &Tick<L>,
2646        nondet: NonDet,
2647    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2648        let _ = nondet;
2649        KeyedStream::new(
2650            tick.clone(),
2651            HydroNode::Batch {
2652                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2653                metadata: tick.new_node_metadata(
2654                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2655                ),
2656            },
2657        )
2658    }
2659
2660    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2661    /// See [`KeyedStream::atomic`] for more details.
2662    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2663        KeyedStream::new(
2664            self.location.tick.l.clone(),
2665            HydroNode::EndAtomic {
2666                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2667                metadata: self
2668                    .location
2669                    .tick
2670                    .l
2671                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2672            },
2673        )
2674    }
2675}
2676
2677impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2678where
2679    L: Location<'a>,
2680{
2681    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2682    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2683    /// each key.
2684    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2685        KeyedStream::new(
2686            self.location.outer().clone(),
2687            HydroNode::YieldConcat {
2688                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2689                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2690                    K,
2691                    V,
2692                    L,
2693                    Unbounded,
2694                    O,
2695                    R,
2696                >::collection_kind(
2697                )),
2698            },
2699        )
2700    }
2701
2702    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2703    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2704    /// each key.
2705    ///
2706    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2707    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2708    /// stream's [`Tick`] context.
2709    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2710        let out_location = Atomic {
2711            tick: self.location.clone(),
2712        };
2713
2714        KeyedStream::new(
2715            out_location.clone(),
2716            HydroNode::YieldConcat {
2717                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2718                metadata: out_location.new_node_metadata(KeyedStream::<
2719                    K,
2720                    V,
2721                    Atomic<L>,
2722                    Unbounded,
2723                    O,
2724                    R,
2725                >::collection_kind()),
2726            },
2727        )
2728    }
2729
2730    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2731    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2732    ///
2733    /// This API is particularly useful for stateful computation on batches of data, such as
2734    /// maintaining an accumulated state that is up to date with the current batch.
2735    ///
2736    /// # Example
2737    /// ```rust
2738    /// # #[cfg(feature = "deploy")] {
2739    /// # use hydro_lang::prelude::*;
2740    /// # use futures::StreamExt;
2741    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2742    /// let tick = process.tick();
2743    /// # // ticks are lazy by default, forces the second tick to run
2744    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2745    /// # let batch_first_tick = process
2746    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2747    /// #   .into_keyed()
2748    /// #   .batch(&tick, nondet!(/** test */));
2749    /// # let batch_second_tick = process
2750    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2751    /// #   .into_keyed()
2752    /// #   .batch(&tick, nondet!(/** test */))
2753    /// #   .defer_tick(); // appears on the second tick
2754    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2755    ///
2756    /// input.batch(&tick, nondet!(/** test */))
2757    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2758    ///         *sum += new;
2759    ///     }))).entries().all_ticks()
2760    /// # }, |mut stream| async move {
2761    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2762    /// # let mut results = Vec::new();
2763    /// # for _ in 0..4 {
2764    /// #     results.push(stream.next().await.unwrap());
2765    /// # }
2766    /// # results.sort();
2767    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2768    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2769    /// # results.clear();
2770    /// # for _ in 0..4 {
2771    /// #     results.push(stream.next().await.unwrap());
2772    /// # }
2773    /// # results.sort();
2774    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2775    /// # }));
2776    /// # }
2777    /// ```
2778    pub fn across_ticks<Out: BatchAtomic>(
2779        self,
2780        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2781    ) -> Out::Batched {
2782        thunk(self.all_ticks_atomic()).batched_atomic()
2783    }
2784
2785    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2786    /// tick `T` always has the entries of `self` at tick `T - 1`.
2787    ///
2788    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2789    ///
2790    /// This operator enables stateful iterative processing with ticks, by sending data from one
2791    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2792    ///
2793    /// # Example
2794    /// ```rust
2795    /// # #[cfg(feature = "deploy")] {
2796    /// # use hydro_lang::prelude::*;
2797    /// # use futures::StreamExt;
2798    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2799    /// let tick = process.tick();
2800    /// # // ticks are lazy by default, forces the second tick to run
2801    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2802    /// # let batch_first_tick = process
2803    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2804    /// #   .batch(&tick, nondet!(/** test */))
2805    /// #   .into_keyed();
2806    /// # let batch_second_tick = process
2807    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2808    /// #   .batch(&tick, nondet!(/** test */))
2809    /// #   .defer_tick()
2810    /// #   .into_keyed(); // appears on the second tick
2811    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2812    /// # batch_first_tick.chain(batch_second_tick);
2813    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2814    ///     changes_across_ticks // from the current tick
2815    /// )
2816    /// # .entries().all_ticks()
2817    /// # }, |mut stream| async move {
2818    /// // First tick: { 1: [2, 3] }
2819    /// # let mut results = Vec::new();
2820    /// # for _ in 0..2 {
2821    /// #     results.push(stream.next().await.unwrap());
2822    /// # }
2823    /// # results.sort();
2824    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2825    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2826    /// # results.clear();
2827    /// # for _ in 0..4 {
2828    /// #     results.push(stream.next().await.unwrap());
2829    /// # }
2830    /// # results.sort();
2831    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2832    /// // Third tick: { 1: [4], 2: [5] }
2833    /// # results.clear();
2834    /// # for _ in 0..2 {
2835    /// #     results.push(stream.next().await.unwrap());
2836    /// # }
2837    /// # results.sort();
2838    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2839    /// # }));
2840    /// # }
2841    /// ```
2842    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2843        KeyedStream::new(
2844            self.location.clone(),
2845            HydroNode::DeferTick {
2846                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2847                metadata: self.location.new_node_metadata(KeyedStream::<
2848                    K,
2849                    V,
2850                    Tick<L>,
2851                    Bounded,
2852                    O,
2853                    R,
2854                >::collection_kind()),
2855            },
2856        )
2857    }
2858}
2859
2860#[cfg(test)]
2861mod tests {
2862    #[cfg(feature = "deploy")]
2863    use futures::{SinkExt, StreamExt};
2864    #[cfg(feature = "deploy")]
2865    use hydro_deploy::Deployment;
2866    #[cfg(any(feature = "deploy", feature = "sim"))]
2867    use stageleft::q;
2868
2869    #[cfg(any(feature = "deploy", feature = "sim"))]
2870    use crate::compile::builder::FlowBuilder;
2871    #[cfg(feature = "deploy")]
2872    use crate::live_collections::stream::ExactlyOnce;
2873    #[cfg(feature = "sim")]
2874    use crate::live_collections::stream::{NoOrder, TotalOrder};
2875    #[cfg(any(feature = "deploy", feature = "sim"))]
2876    use crate::location::Location;
2877    #[cfg(feature = "sim")]
2878    use crate::networking::TCP;
2879    #[cfg(any(feature = "deploy", feature = "sim"))]
2880    use crate::nondet::nondet;
2881    #[cfg(feature = "deploy")]
2882    use crate::properties::manual_proof;
2883
2884    #[cfg(feature = "deploy")]
2885    #[tokio::test]
2886    async fn get_unbounded_keyed_stream_bounded_singleton() {
2887        let mut deployment = Deployment::new();
2888
2889        let mut flow = FlowBuilder::new();
2890        let node = flow.process::<()>();
2891        let external = flow.external::<()>();
2892
2893        let (input_send, input_stream) =
2894            node.source_external_bincode::<_, (i32, i32), _, ExactlyOnce>(&external);
2895
2896        let key = node.singleton(q!(1));
2897
2898        let out = input_stream
2899            .into_keyed()
2900            .get(key)
2901            .send_bincode_external(&external);
2902
2903        let nodes = flow
2904            .with_process(&node, deployment.Localhost())
2905            .with_external(&external, deployment.Localhost())
2906            .deploy(&mut deployment);
2907
2908        deployment.deploy().await.unwrap();
2909
2910        let mut input_send = nodes.connect(input_send).await;
2911        let mut out = nodes.connect(out).await;
2912
2913        deployment.start().await.unwrap();
2914
2915        // First batch
2916        input_send.send((1, 10)).await.unwrap();
2917        input_send.send((2, 20)).await.unwrap();
2918        assert_eq!(out.next().await.unwrap(), 10);
2919
2920        // Second batch
2921        input_send.send((1, 11)).await.unwrap();
2922        input_send.send((2, 21)).await.unwrap();
2923        assert_eq!(out.next().await.unwrap(), 11);
2924    }
2925
2926    #[cfg(feature = "deploy")]
2927    #[tokio::test]
2928    async fn reduce_watermark_filter() {
2929        let mut deployment = Deployment::new();
2930
2931        let mut flow = FlowBuilder::new();
2932        let node = flow.process::<()>();
2933        let external = flow.external::<()>();
2934
2935        let node_tick = node.tick();
2936        let watermark = node_tick.singleton(q!(2));
2937
2938        let sum = node
2939            .source_stream(q!(tokio_stream::iter([
2940                (0, 100),
2941                (1, 101),
2942                (2, 102),
2943                (2, 102)
2944            ])))
2945            .into_keyed()
2946            .reduce_watermark(
2947                watermark,
2948                q!(|acc, v| {
2949                    *acc += v;
2950                }),
2951            )
2952            .snapshot(&node_tick, nondet!(/** test */))
2953            .entries()
2954            .all_ticks()
2955            .send_bincode_external(&external);
2956
2957        let nodes = flow
2958            .with_process(&node, deployment.Localhost())
2959            .with_external(&external, deployment.Localhost())
2960            .deploy(&mut deployment);
2961
2962        deployment.deploy().await.unwrap();
2963
2964        let mut out = nodes.connect(sum).await;
2965
2966        deployment.start().await.unwrap();
2967
2968        assert_eq!(out.next().await.unwrap(), (2, 204));
2969    }
2970
2971    #[cfg(feature = "deploy")]
2972    #[tokio::test]
2973    async fn reduce_watermark_bounded() {
2974        let mut deployment = Deployment::new();
2975
2976        let mut flow = FlowBuilder::new();
2977        let node = flow.process::<()>();
2978        let external = flow.external::<()>();
2979
2980        let node_tick = node.tick();
2981        let watermark = node_tick.singleton(q!(2));
2982
2983        let sum = node
2984            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2985            .into_keyed()
2986            .reduce_watermark(
2987                watermark,
2988                q!(|acc, v| {
2989                    *acc += v;
2990                }),
2991            )
2992            .entries()
2993            .send_bincode_external(&external);
2994
2995        let nodes = flow
2996            .with_process(&node, deployment.Localhost())
2997            .with_external(&external, deployment.Localhost())
2998            .deploy(&mut deployment);
2999
3000        deployment.deploy().await.unwrap();
3001
3002        let mut out = nodes.connect(sum).await;
3003
3004        deployment.start().await.unwrap();
3005
3006        assert_eq!(out.next().await.unwrap(), (2, 204));
3007    }
3008
3009    #[cfg(feature = "deploy")]
3010    #[tokio::test]
3011    async fn reduce_watermark_garbage_collect() {
3012        let mut deployment = Deployment::new();
3013
3014        let mut flow = FlowBuilder::new();
3015        let node = flow.process::<()>();
3016        let external = flow.external::<()>();
3017        let (tick_send, tick_trigger) =
3018            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
3019
3020        let node_tick = node.tick();
3021        let (watermark_complete_cycle, watermark) =
3022            node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
3023        let next_watermark = watermark.clone().map(q!(|v| v + 1));
3024        watermark_complete_cycle.complete_next_tick(next_watermark);
3025
3026        let tick_triggered_input = node_tick
3027            .singleton(q!((3, 103)))
3028            .into_stream()
3029            .filter_if(
3030                tick_trigger
3031                    .clone()
3032                    .batch(&node_tick, nondet!(/** test */))
3033                    .first()
3034                    .is_some(),
3035            )
3036            .all_ticks();
3037
3038        let sum = node
3039            .source_stream(q!(tokio_stream::iter([
3040                (0, 100),
3041                (1, 101),
3042                (2, 102),
3043                (2, 102)
3044            ])))
3045            .merge_unordered(tick_triggered_input)
3046            .into_keyed()
3047            .reduce_watermark(
3048                watermark,
3049                q!(
3050                    |acc, v| {
3051                        *acc += v;
3052                    },
3053                    commutative = manual_proof!(/** integer addition is commutative */)
3054                ),
3055            )
3056            .snapshot(&node_tick, nondet!(/** test */))
3057            .entries()
3058            .all_ticks()
3059            .send_bincode_external(&external);
3060
3061        let nodes = flow
3062            .with_default_optimize()
3063            .with_process(&node, deployment.Localhost())
3064            .with_external(&external, deployment.Localhost())
3065            .deploy(&mut deployment);
3066
3067        deployment.deploy().await.unwrap();
3068
3069        let mut tick_send = nodes.connect(tick_send).await;
3070        let mut out_recv = nodes.connect(sum).await;
3071
3072        deployment.start().await.unwrap();
3073
3074        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
3075
3076        tick_send.send(()).await.unwrap();
3077
3078        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
3079    }
3080
3081    #[cfg(feature = "sim")]
3082    #[test]
3083    #[should_panic]
3084    fn sim_batch_nondet_size() {
3085        let mut flow = FlowBuilder::new();
3086        let node = flow.process::<()>();
3087
3088        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3089
3090        let tick = node.tick();
3091        let out_recv = input
3092            .batch(&tick, nondet!(/** test */))
3093            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
3094            .entries()
3095            .all_ticks()
3096            .sim_output();
3097
3098        flow.sim().exhaustive(async || {
3099            out_recv
3100                .assert_yields_only_unordered([(1, vec![1, 2])])
3101                .await;
3102        });
3103    }
3104
3105    #[cfg(feature = "sim")]
3106    #[test]
3107    fn sim_batch_preserves_group_order() {
3108        let mut flow = FlowBuilder::new();
3109        let node = flow.process::<()>();
3110
3111        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3112
3113        let tick = node.tick();
3114        let out_recv = input
3115            .batch(&tick, nondet!(/** test */))
3116            .all_ticks()
3117            .fold_early_stop(
3118                q!(|| 0),
3119                q!(|acc, v| {
3120                    *acc = std::cmp::max(v, *acc);
3121                    *acc >= 2
3122                }),
3123            )
3124            .entries()
3125            .sim_output();
3126
3127        let instances = flow.sim().exhaustive(async || {
3128            out_recv
3129                .assert_yields_only_unordered([(1, 2), (2, 3)])
3130                .await;
3131        });
3132
3133        assert_eq!(instances, 8);
3134        // - three cases: all three in a separate tick (pick where (2, 3) is)
3135        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
3136        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
3137        // - one case: all three together
3138    }
3139
3140    #[cfg(feature = "sim")]
3141    #[test]
3142    fn sim_batch_unordered_shuffles() {
3143        let mut flow = FlowBuilder::new();
3144        let node = flow.process::<()>();
3145
3146        let input = node
3147            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
3148            .into_keyed()
3149            .weaken_ordering::<NoOrder>();
3150
3151        let tick = node.tick();
3152        let out_recv = input
3153            .batch(&tick, nondet!(/** test */))
3154            .all_ticks()
3155            .entries()
3156            .sim_output();
3157
3158        let instances = flow.sim().exhaustive(async || {
3159            out_recv
3160                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3161                .await;
3162        });
3163
3164        assert_eq!(instances, 13);
3165        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3166        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3167        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3168        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3169    }
3170
3171    #[cfg(feature = "sim")]
3172    #[test]
3173    #[should_panic]
3174    fn sim_observe_order_batched() {
3175        let mut flow = FlowBuilder::new();
3176        let node = flow.process::<()>();
3177
3178        let (port, input) = node.sim_input::<_, NoOrder, _>();
3179
3180        let tick = node.tick();
3181        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3182        let out_recv = batch
3183            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3184            .all_ticks()
3185            .first()
3186            .entries()
3187            .sim_output();
3188
3189        flow.sim().exhaustive(async || {
3190            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3191            out_recv
3192                .assert_yields_only_unordered([(1, 1), (2, 1)])
3193                .await; // fails with assume_ordering
3194        });
3195    }
3196
3197    #[cfg(feature = "sim")]
3198    #[test]
3199    fn sim_observe_order_batched_count() {
3200        let mut flow = FlowBuilder::new();
3201        let node = flow.process::<()>();
3202
3203        let (port, input) = node.sim_input::<_, NoOrder, _>();
3204
3205        let tick = node.tick();
3206        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3207        let out_recv = batch
3208            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3209            .all_ticks()
3210            .entries()
3211            .sim_output();
3212
3213        let instance_count = flow.sim().exhaustive(async || {
3214            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3215            let _ = out_recv.collect_sorted::<Vec<_>>().await;
3216        });
3217
3218        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3219    }
3220
3221    #[cfg(feature = "sim")]
3222    #[test]
3223    fn sim_top_level_assume_ordering() {
3224        use std::collections::HashMap;
3225
3226        let mut flow = FlowBuilder::new();
3227        let node = flow.process::<()>();
3228
3229        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3230
3231        let out_recv = input
3232            .into_keyed()
3233            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3234            .fold_early_stop(
3235                q!(|| Vec::new()),
3236                q!(|acc, v| {
3237                    acc.push(v);
3238                    acc.len() >= 2
3239                }),
3240            )
3241            .entries()
3242            .sim_output();
3243
3244        let instance_count = flow.sim().exhaustive(async || {
3245            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3246            let out: HashMap<_, _> = out_recv
3247                .collect_sorted::<Vec<_>>()
3248                .await
3249                .into_iter()
3250                .collect();
3251            // Each key accumulates its values; we get one entry per key
3252            assert_eq!(out.len(), 2);
3253        });
3254
3255        assert_eq!(instance_count, 24)
3256    }
3257
3258    #[cfg(feature = "sim")]
3259    #[test]
3260    fn sim_top_level_assume_ordering_cycle_back() {
3261        use std::collections::HashMap;
3262
3263        let mut flow = FlowBuilder::new();
3264        let node = flow.process::<()>();
3265        let node2 = flow.process::<()>();
3266
3267        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3268
3269        let (complete_cycle_back, cycle_back) =
3270            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3271        let ordered = input
3272            .into_keyed()
3273            .merge_unordered(cycle_back)
3274            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3275        complete_cycle_back.complete(
3276            ordered
3277                .clone()
3278                .map(q!(|v| v + 1))
3279                .filter(q!(|v| v % 2 == 1))
3280                .entries()
3281                .send(&node2, TCP.fail_stop().bincode())
3282                .send(&node, TCP.fail_stop().bincode())
3283                .into_keyed(),
3284        );
3285
3286        let out_recv = ordered
3287            .fold_early_stop(
3288                q!(|| Vec::new()),
3289                q!(|acc, v| {
3290                    acc.push(v);
3291                    acc.len() >= 2
3292                }),
3293            )
3294            .entries()
3295            .sim_output();
3296
3297        let mut saw = false;
3298        let instance_count = flow.sim().exhaustive(async || {
3299            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3300            // We want to see [0, 1] - the cycled back value interleaved
3301            in_send.send_many_unordered([(1, 0), (1, 2)]);
3302            let out: HashMap<_, _> = out_recv
3303                .collect_sorted::<Vec<_>>()
3304                .await
3305                .into_iter()
3306                .collect();
3307
3308            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3309            if let Some(values) = out.get(&1)
3310                && *values == vec![0, 1]
3311            {
3312                saw = true;
3313            }
3314        });
3315
3316        assert!(
3317            saw,
3318            "did not see an instance with key 1 having [0, 1] in order"
3319        );
3320        assert_eq!(instance_count, 6);
3321    }
3322
3323    #[cfg(feature = "sim")]
3324    #[test]
3325    fn sim_top_level_assume_ordering_cross_key_cycle() {
3326        use std::collections::HashMap;
3327
3328        // This test demonstrates why releasing one entry at a time is important:
3329        // When one key's observed order cycles back into a different key, we need
3330        // to be able to interleave the cycled-back entry with pending items for
3331        // that other key.
3332        let mut flow = FlowBuilder::new();
3333        let node = flow.process::<()>();
3334        let node2 = flow.process::<()>();
3335
3336        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3337
3338        let (complete_cycle_back, cycle_back) =
3339            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3340        let ordered = input
3341            .into_keyed()
3342            .merge_unordered(cycle_back)
3343            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3344
3345        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3346        complete_cycle_back.complete(
3347            ordered
3348                .clone()
3349                .filter(q!(|v| *v == 10))
3350                .map(q!(|_| 100))
3351                .entries()
3352                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3353                .send(&node2, TCP.fail_stop().bincode())
3354                .send(&node, TCP.fail_stop().bincode())
3355                .into_keyed(),
3356        );
3357
3358        let out_recv = ordered
3359            .fold_early_stop(
3360                q!(|| Vec::new()),
3361                q!(|acc, v| {
3362                    acc.push(v);
3363                    acc.len() >= 2
3364                }),
3365            )
3366            .entries()
3367            .sim_output();
3368
3369        // We want to see an instance where:
3370        // - (1, 10) is released first
3371        // - This causes (2, 100) to be cycled back
3372        // - (2, 100) is released BEFORE (2, 20) which was already pending
3373        let mut saw_cross_key_interleave = false;
3374        let instance_count = flow.sim().exhaustive(async || {
3375            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3376            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3377            let out: HashMap<_, _> = out_recv
3378                .collect_sorted::<Vec<_>>()
3379                .await
3380                .into_iter()
3381                .collect();
3382
3383            // Check if we see the cross-key interleaving:
3384            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3385            if let Some(values) = out.get(&2)
3386                && values.len() >= 2
3387                && values[0] == 100
3388            {
3389                saw_cross_key_interleave = true;
3390            }
3391        });
3392
3393        assert!(
3394            saw_cross_key_interleave,
3395            "did not see an instance where cycled-back 100 was released before pending items for key 2"
3396        );
3397        assert_eq!(instance_count, 60);
3398    }
3399
3400    #[cfg(feature = "sim")]
3401    #[test]
3402    fn sim_top_level_assume_ordering_cycle_back_tick() {
3403        use std::collections::HashMap;
3404
3405        let mut flow = FlowBuilder::new();
3406        let node = flow.process::<()>();
3407        let node2 = flow.process::<()>();
3408
3409        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3410
3411        let (complete_cycle_back, cycle_back) =
3412            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3413        let ordered = input
3414            .into_keyed()
3415            .merge_unordered(cycle_back)
3416            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3417        complete_cycle_back.complete(
3418            ordered
3419                .clone()
3420                .batch(&node.tick(), nondet!(/** test */))
3421                .all_ticks()
3422                .map(q!(|v| v + 1))
3423                .filter(q!(|v| v % 2 == 1))
3424                .entries()
3425                .send(&node2, TCP.fail_stop().bincode())
3426                .send(&node, TCP.fail_stop().bincode())
3427                .into_keyed(),
3428        );
3429
3430        let out_recv = ordered
3431            .fold_early_stop(
3432                q!(|| Vec::new()),
3433                q!(|acc, v| {
3434                    acc.push(v);
3435                    acc.len() >= 2
3436                }),
3437            )
3438            .entries()
3439            .sim_output();
3440
3441        let mut saw = false;
3442        let instance_count = flow.sim().exhaustive(async || {
3443            in_send.send_many_unordered([(1, 0), (1, 2)]);
3444            let out: HashMap<_, _> = out_recv
3445                .collect_sorted::<Vec<_>>()
3446                .await
3447                .into_iter()
3448                .collect();
3449
3450            if let Some(values) = out.get(&1)
3451                && *values == vec![0, 1]
3452            {
3453                saw = true;
3454            }
3455        });
3456
3457        assert!(
3458            saw,
3459            "did not see an instance with key 1 having [0, 1] in order"
3460        );
3461        assert_eq!(instance_count, 58);
3462    }
3463
3464    #[cfg(feature = "sim")]
3465    #[test]
3466    fn sim_entries_partially_ordered_bounded() {
3467        let mut flow = FlowBuilder::new();
3468        let node = flow.process::<()>();
3469
3470        let (port, input) = node.sim_input::<_, TotalOrder, _>();
3471
3472        let tick = node.tick();
3473        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3474        let out_recv = batch
3475            .entries_partially_ordered(nondet!(/** test */))
3476            .all_ticks()
3477            .sim_output();
3478
3479        let instance_count = flow.sim().exhaustive(async || {
3480            port.send((1, 'a'));
3481            port.send((1, 'b'));
3482            port.send((2, 'c'));
3483            let _: Vec<(i32, char)> = out_recv.collect().await;
3484        });
3485
3486        assert_eq!(instance_count, 12);
3487    }
3488
3489    #[cfg(feature = "sim")]
3490    #[test]
3491    fn sim_entries_partially_ordered_top_level() {
3492        let mut flow = FlowBuilder::new();
3493        let node = flow.process::<()>();
3494
3495        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3496
3497        let out_recv = input
3498            .into_keyed()
3499            .entries_partially_ordered(nondet!(/** test */))
3500            .sim_output();
3501
3502        let instance_count = flow.sim().exhaustive(async || {
3503            in_send.send((1, 'a'));
3504            in_send.send((1, 'b'));
3505            in_send.send((2, 'c'));
3506            let _: Vec<(i32, char)> = out_recv.collect().await;
3507        });
3508
3509        assert_eq!(instance_count, 3);
3510    }
3511
3512    #[cfg(feature = "sim")]
3513    #[test]
3514    fn sim_entries_partially_ordered_cycle_back() {
3515        let mut flow = FlowBuilder::new();
3516        let node = flow.process::<()>();
3517        let node2 = flow.process::<()>();
3518
3519        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3520
3521        let (complete_cycle_back, cycle_back) =
3522            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3523        let ordered = input
3524            .into_keyed()
3525            .merge_unordered(cycle_back)
3526            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3527
3528        let flat = ordered
3529            .clone()
3530            .entries_partially_ordered(nondet!(/** test */));
3531
3532        complete_cycle_back.complete(
3533            flat.clone()
3534                .map(q!(|(k, v): (i32, i32)| (k, v + 1)))
3535                .filter(q!(|(_, v)| *v % 2 == 1))
3536                .send(&node2, TCP.fail_stop().bincode())
3537                .send(&node, TCP.fail_stop().bincode())
3538                .into_keyed(),
3539        );
3540
3541        let out_recv = flat.sim_output();
3542
3543        let mut saw = false;
3544        let instance_count = flow.sim().exhaustive(async || {
3545            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back as (1, 1).
3546            // We want to see (1, 1) before (1, 2) - the cycled back value beats the pending one
3547            in_send.send_many_unordered([(1, 0), (1, 2)]);
3548            let results: Vec<(i32, i32)> = out_recv.collect().await;
3549
3550            let pos_1 = results.iter().position(|v| *v == (1, 1));
3551            let pos_2 = results.iter().position(|v| *v == (1, 2));
3552            if let (Some(p1), Some(p2)) = (pos_1, pos_2)
3553                && p1 < p2
3554            {
3555                saw = true;
3556            }
3557        });
3558
3559        assert!(saw, "did not see an instance with (1, 1) before (1, 2)");
3560        assert_eq!(instance_count, 78);
3561    }
3562}