Skip to main content

hydro_lang/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![cfg_attr(not(stageleft_trybuild), warn(missing_docs))]
3
4//! Hydro is a high-level distributed programming framework for Rust.
5//! Hydro can help you quickly write scalable distributed services that are correct by construction.
6//! Much like Rust helps with memory safety, Hydro helps with [distributed safety](https://hydro.run/docs/hydro/reference/correctness).
7//!
8//! The core Hydro API involves [live collections](https://hydro.run/docs/hydro/reference/live-collections/), which represent asynchronously
9//! updated sources of data such as incoming network requests and application state. The most common live collection is
10//! [`live_collections::stream::Stream`]; other live collections can be found in [`live_collections`].
11//!
12//! Hydro uses a unique compilation approach where you define deployment logic as Rust code alongside your distributed system implementation.
13//! For more details on this API, see the [Hydro docs](https://hydro.run/docs/hydro/reference/deploy/) and the [`deploy`] module.
14
15stageleft::stageleft_no_entry_crate!();
16
17#[cfg(feature = "runtime_support")]
18#[cfg_attr(docsrs, doc(cfg(feature = "runtime_support")))]
19#[doc(hidden)]
20pub mod runtime_support {
21    pub use ::{bincode, dfir_rs, slotmap, stageleft, tokio};
22    #[cfg(feature = "sim")]
23    pub use colored;
24    #[cfg(feature = "deploy_integration")]
25    pub use hydro_deploy_integration;
26
27    #[cfg(any(feature = "deploy_integration", feature = "docker_runtime"))]
28    pub mod launch;
29}
30
31#[doc(hidden)]
32pub mod macro_support {
33    pub use copy_span;
34}
35
36pub mod prelude {
37    // taken from `tokio`
38    //! A "prelude" for users of the `hydro_lang` crate.
39    //!
40    //! This prelude is similar to the standard library's prelude in that you'll almost always want to import its entire contents, but unlike the standard library's prelude you'll have to do so manually:
41    //! ```
42    //! # #![allow(warnings)]
43    //! use hydro_lang::prelude::*;
44    //! ```
45    //!
46    //! The prelude may grow over time as additional items see ubiquitous use.
47
48    pub use stageleft::q;
49
50    pub use crate::compile::builder::FlowBuilder;
51    pub use crate::live_collections::boundedness::{Bounded, Unbounded};
52    pub use crate::live_collections::keyed_singleton::{KeyedSingleton, MonotonicKeys};
53    pub use crate::live_collections::keyed_stream::KeyedStream;
54    pub use crate::live_collections::optional::Optional;
55    pub use crate::live_collections::singleton::Singleton;
56    pub use crate::live_collections::sliced::sliced;
57    pub use crate::live_collections::stream::Stream;
58    pub use crate::location::{Cluster, External, Location as _, Process, Tick};
59    pub use crate::networking::TCP;
60    pub use crate::nondet::{NonDet, nondet};
61    pub use crate::properties::{ConsistencyProof, ManualProof, manual_proof};
62
63    /// A macro to set up a Hydro crate.
64    #[macro_export]
65    macro_rules! setup {
66        () => {
67            stageleft::stageleft_no_entry_crate!();
68
69            #[cfg(test)]
70            mod test_init {
71                #[ctor::ctor]
72                fn init() {
73                    $crate::compile::init_test();
74                }
75            }
76        };
77    }
78}
79
80#[cfg(feature = "dfir_context")]
81#[cfg_attr(docsrs, doc(cfg(feature = "dfir_context")))]
82pub mod runtime_context;
83
84pub mod nondet;
85
86pub mod live_collections;
87
88pub mod location;
89
90pub mod networking;
91
92pub mod properties;
93
94pub mod telemetry;
95
96#[cfg(any(
97    feature = "deploy",
98    feature = "deploy_integration" // hidden internal feature enabled in the trybuild
99))]
100#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
101pub mod deploy;
102
103#[cfg(feature = "sim")]
104#[cfg_attr(docsrs, doc(cfg(feature = "sim")))]
105pub mod sim;
106
107pub mod forward_handle;
108
109pub mod compile;
110
111pub mod handoff_ref;
112
113mod manual_expr;
114
115#[cfg(stageleft_runtime)]
116#[cfg(feature = "viz")]
117#[cfg_attr(docsrs, doc(cfg(feature = "viz")))]
118#[expect(missing_docs, reason = "TODO")]
119pub mod viz;
120
121#[cfg_attr(
122    feature = "stageleft_macro_entrypoint",
123    expect(missing_docs, reason = "staging internals")
124)]
125mod staging_util;
126
127#[cfg(feature = "deploy")]
128#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
129pub mod test_util;
130
131#[cfg(feature = "build")]
132#[ctor::ctor]
133fn init_rewrites() {
134    stageleft::add_private_reexport(
135        vec!["tokio_util", "codec", "lines_codec"],
136        vec!["tokio_util", "codec"],
137    );
138    // TODO: remove once stageleft is updated with this rewrite built-in
139    stageleft::add_private_reexport(
140        vec!["core", "iter", "sources", "empty"],
141        vec!["std", "iter"],
142    );
143}
144
145#[cfg(all(test, feature = "trybuild"))]
146mod test_init {
147    #[ctor::ctor]
148    fn init() {
149        crate::compile::init_test();
150    }
151}
152
153/// Creates a newtype wrapper around an integer type.
154///
155/// Usage:
156/// ```rust,ignore
157/// hydro_lang::newtype_counter! {
158///     /// My counter.
159///     pub struct MyCounter(u32);
160///
161///     /// My secret counter.
162///     struct SecretCounter(u64);
163/// }
164/// ```
165#[doc(hidden)]
166#[macro_export]
167macro_rules! newtype_counter {
168    (
169        $(
170            $( #[$attr:meta] )*
171            $vis:vis struct $name:ident($typ:ty);
172        )*
173    ) => {
174        $(
175            $( #[$attr] )*
176            #[repr(transparent)]
177            #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
178            $vis struct $name($typ);
179
180            #[allow(clippy::allow_attributes, dead_code, reason = "macro-generated methods may be unused")]
181            impl $name {
182                /// Reveals the inner ID.
183                pub fn into_inner(self) -> $typ {
184                    self.0
185                }
186            }
187
188            impl std::fmt::Display for $name {
189                fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190                    write!(f, "{}", self.0)
191                }
192            }
193
194            impl serde::ser::Serialize for $name {
195                fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
196                where
197                    S: serde::Serializer
198                {
199                    serde::ser::Serialize::serialize(&self.0, serializer)
200                }
201            }
202
203            impl<'de> serde::de::Deserialize<'de> for $name {
204                fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
205                where
206                    D: serde::Deserializer<'de>
207                {
208                    serde::de::Deserialize::deserialize(deserializer).map(Self)
209                }
210            }
211
212            #[sealed::sealed]
213            impl $crate::Countable for $name {
214                fn from_count(val: usize) -> Self {
215                    Self(val as $typ)
216                }
217            }
218        )*
219    };
220}
221
222/// Sealed trait implemented by ID types produced via [`newtype_counter!`].
223///
224/// This allows [`Counter<T>`] to mint new IDs without exposing a public
225/// constructor on the ID types themselves.
226#[doc(hidden)]
227#[sealed::sealed]
228pub trait Countable {
229    #[doc(hidden)]
230    fn from_count(val: usize) -> Self;
231}
232
233/// An opaque counter that produces unique IDs of type `T` via [`Counter::get_and_increment`].
234///
235/// This is separate from the ID types themselves so that holding an ID does not
236/// give the ability to mint new IDs.
237#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
238pub struct Counter<T: Countable>(usize, std::marker::PhantomData<T>);
239
240impl<T: Countable> Default for Counter<T> {
241    fn default() -> Self {
242        Self(0, std::marker::PhantomData)
243    }
244}
245
246impl<T: Countable> Counter<T> {
247    /// Gets the current counter value and increments for the next call.
248    pub fn get_and_increment(&mut self) -> T {
249        let id = self.0;
250        self.0 += 1;
251        T::from_count(id)
252    }
253
254    /// Returns an iterator from zero up to (but excluding) the current counter value.
255    ///
256    /// This is useful for iterating already-allocated values.
257    pub fn range_up_to(&self) -> impl DoubleEndedIterator<Item = T> + std::iter::FusedIterator {
258        (0..self.0).map(T::from_count)
259    }
260}