Skip to main content

hydro_lang/location/
process.rs

1//! Definition of the [`Process`] location type, representing a single-node
2//! compute location in a distributed Hydro program.
3//!
4//! A [`Process`] is the simplest kind of location: it corresponds to exactly one
5//! machine (or OS process) and all live collections placed on it are materialized
6//! on that single node. Use a process when the computation does not need to be
7//! replicated or partitioned across multiple nodes.
8//!
9//! Processes are created via [`FlowBuilder::process`](crate::compile::builder::FlowBuilder::process)
10//! and are parameterized by a **tag type** (`ProcessTag`) that lets the type
11//! system distinguish different processes at compile time.
12
13use std::fmt::{Debug, Formatter};
14use std::marker::PhantomData;
15
16use super::{Location, LocationId};
17use crate::compile::builder::FlowState;
18use crate::location::LocationKey;
19use crate::staging_util::Invariant;
20
21/// A single-node location in a distributed Hydro program.
22///
23/// `Process` represents exactly one machine (or OS process) and is one of the
24/// core location types that implements the [`Location`] trait. Live collections
25/// placed on a `Process` are materialized entirely on that single node.
26///
27/// The type parameter `ProcessTag` is a compile-time marker that differentiates
28/// distinct processes in the same dataflow graph (e.g. `Process<'a, Leader>` vs
29/// `Process<'a, Follower>`). It defaults to `()` when only one process is
30/// needed.
31///
32/// # Creating a Process
33/// ```rust,ignore
34/// let mut flow = FlowBuilder::new();
35/// let node = flow.process::<MyTag>();
36/// ```
37pub struct Process<'a, ProcessTag = ()> {
38    pub(crate) key: LocationKey,
39    pub(crate) flow_state: FlowState,
40    pub(crate) _phantom: Invariant<'a, ProcessTag>,
41}
42
43impl<P> Debug for Process<'_, P> {
44    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45        write!(f, "Process({})", self.key)
46    }
47}
48
49impl<P> Eq for Process<'_, P> {}
50impl<P> PartialEq for Process<'_, P> {
51    fn eq(&self, other: &Self) -> bool {
52        self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
53    }
54}
55
56impl<P> Clone for Process<'_, P> {
57    fn clone(&self) -> Self {
58        Process {
59            key: self.key,
60            flow_state: self.flow_state.clone(),
61            _phantom: PhantomData,
62        }
63    }
64}
65
66impl<'a, P> super::dynamic::DynLocation for Process<'a, P> {
67    fn id(&self) -> LocationId {
68        LocationId::Process(self.key)
69    }
70
71    fn flow_state(&self) -> &FlowState {
72        &self.flow_state
73    }
74
75    fn is_top_level() -> bool {
76        true
77    }
78
79    fn multiversioned(&self) -> bool {
80        false // processes are always single-versioned
81    }
82}
83
84impl<'a, P> Location<'a> for Process<'a, P> {
85    type Root = Self;
86
87    fn root(&self) -> Self::Root {
88        self.clone()
89    }
90}
91
92#[cfg(feature = "sim")]
93impl<'a, P> Process<'a, P> {
94    /// Sets up a simulated atomic input port on this process for testing.
95    ///
96    /// Returns a sender whose `send` method returns a future that resolves once
97    /// the atomic tick containing the sent item has completed, along with a stream
98    /// in an atomic context.
99    ///
100    /// This is equivalent to manually calling `sim_input`, `.atomic()`, wrapping
101    /// in a shared cell, and wiring up an ack via `.end_atomic().sim_output()`.
102    #[expect(clippy::type_complexity, reason = "stream markers")]
103    pub fn sim_atomic_input<
104        T,
105        O: crate::live_collections::stream::Ordering,
106        R: crate::live_collections::stream::Retries,
107    >(
108        &self,
109    ) -> (
110        crate::sim::SimAtomicSender<T, O, R>,
111        crate::live_collections::stream::Stream<
112            T,
113            super::Atomic<Self>,
114            crate::live_collections::boundedness::Unbounded,
115            O,
116            R,
117        >,
118    )
119    where
120        T: serde::Serialize + serde::de::DeserializeOwned,
121    {
122        use stageleft::q;
123
124        let (sender, stream) = <Self as Location>::sim_input::<T, O, R>(self);
125        let atomic_stream = stream
126            .map(q!(|v| std::rc::Rc::new(std::cell::RefCell::new(Some(v)))))
127            .atomic();
128        let data_stream = atomic_stream
129            .clone()
130            .map(q!(|rc| rc.borrow_mut().take().unwrap()));
131        let ack_receiver = atomic_stream.map(q!(|_| ())).end_atomic().sim_output();
132        (
133            crate::sim::SimAtomicSender(sender, ack_receiver),
134            data_stream,
135        )
136    }
137}