hydro_lang/location/
process.rs1use std::fmt::{Debug, Formatter};
14use std::marker::PhantomData;
15
16use super::{Location, LocationId};
17use crate::compile::builder::FlowState;
18use crate::location::LocationKey;
19use crate::staging_util::Invariant;
20
21pub struct Process<'a, ProcessTag = ()> {
38 pub(crate) key: LocationKey,
39 pub(crate) flow_state: FlowState,
40 pub(crate) _phantom: Invariant<'a, ProcessTag>,
41}
42
43impl<P> Debug for Process<'_, P> {
44 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45 write!(f, "Process({})", self.key)
46 }
47}
48
49impl<P> Eq for Process<'_, P> {}
50impl<P> PartialEq for Process<'_, P> {
51 fn eq(&self, other: &Self) -> bool {
52 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
53 }
54}
55
56impl<P> Clone for Process<'_, P> {
57 fn clone(&self) -> Self {
58 Process {
59 key: self.key,
60 flow_state: self.flow_state.clone(),
61 _phantom: PhantomData,
62 }
63 }
64}
65
66impl<'a, P> super::dynamic::DynLocation for Process<'a, P> {
67 fn id(&self) -> LocationId {
68 LocationId::Process(self.key)
69 }
70
71 fn flow_state(&self) -> &FlowState {
72 &self.flow_state
73 }
74
75 fn is_top_level() -> bool {
76 true
77 }
78
79 fn multiversioned(&self) -> bool {
80 false }
82}
83
84impl<'a, P> Location<'a> for Process<'a, P> {
85 type Root = Self;
86
87 fn root(&self) -> Self::Root {
88 self.clone()
89 }
90}
91
92#[cfg(feature = "sim")]
93impl<'a, P> Process<'a, P> {
94 #[expect(clippy::type_complexity, reason = "stream markers")]
103 pub fn sim_atomic_input<
104 T,
105 O: crate::live_collections::stream::Ordering,
106 R: crate::live_collections::stream::Retries,
107 >(
108 &self,
109 ) -> (
110 crate::sim::SimAtomicSender<T, O, R>,
111 crate::live_collections::stream::Stream<
112 T,
113 super::Atomic<Self>,
114 crate::live_collections::boundedness::Unbounded,
115 O,
116 R,
117 >,
118 )
119 where
120 T: serde::Serialize + serde::de::DeserializeOwned,
121 {
122 use stageleft::q;
123
124 let (sender, stream) = <Self as Location>::sim_input::<T, O, R>(self);
125 let atomic_stream = stream
126 .map(q!(|v| std::rc::Rc::new(std::cell::RefCell::new(Some(v)))))
127 .atomic();
128 let data_stream = atomic_stream
129 .clone()
130 .map(q!(|rc| rc.borrow_mut().take().unwrap()));
131 let ack_receiver = atomic_stream.map(q!(|_| ())).end_atomic().sim_output();
132 (
133 crate::sim::SimAtomicSender(sender, ack_receiver),
134 data_stream,
135 )
136 }
137}