1use sealed::sealed;
22use stageleft::{QuotedWithContext, q};
23
24#[cfg(stageleft_runtime)]
25use super::dynamic::DynLocation;
26use super::{Cluster, Location, LocationId, Process};
27use crate::compile::builder::{ClockId, FlowState};
28use crate::compile::ir::{HydroNode, HydroSource};
29#[cfg(stageleft_runtime)]
30use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
31use crate::forward_handle::{TickCycle, TickCycleHandle};
32use crate::live_collections::boundedness::Bounded;
33use crate::live_collections::optional::Optional;
34use crate::live_collections::singleton::Singleton;
35use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
36use crate::nondet::nondet;
37
38#[sealed]
44pub trait NoTick {}
45#[sealed]
46impl<T> NoTick for Process<'_, T> {}
47#[sealed]
48impl<T> NoTick for Cluster<'_, T> {}
49
50#[sealed]
56pub trait NoAtomic {}
57#[sealed]
58impl<T> NoAtomic for Process<'_, T> {}
59#[sealed]
60impl<T> NoAtomic for Cluster<'_, T> {}
61#[sealed]
62impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
63
64#[derive(Clone)]
75pub struct Atomic<Loc> {
76 pub(crate) tick: Tick<Loc>,
77}
78
79impl<L: DynLocation> DynLocation for Atomic<L> {
80 fn id(&self) -> LocationId {
81 LocationId::Atomic(Box::new(self.tick.id()))
82 }
83
84 fn flow_state(&self) -> &FlowState {
85 self.tick.flow_state()
86 }
87
88 fn is_top_level() -> bool {
89 L::is_top_level()
90 }
91
92 fn multiversioned(&self) -> bool {
93 self.tick.multiversioned()
94 }
95}
96
97impl<'a, L> Location<'a> for Atomic<L>
98where
99 L: Location<'a>,
100{
101 type Root = L::Root;
102
103 fn root(&self) -> Self::Root {
104 self.tick.root()
105 }
106}
107
108#[sealed]
109impl<L> NoTick for Atomic<L> {}
110
111pub trait DeferTick {
118 fn defer_tick(self) -> Self;
120}
121
122#[derive(Clone)]
124pub struct Tick<L> {
125 pub(crate) id: ClockId,
126 pub(crate) l: L,
128}
129
130impl<L: DynLocation> DynLocation for Tick<L> {
131 fn id(&self) -> LocationId {
132 LocationId::Tick(self.id, Box::new(self.l.id()))
133 }
134
135 fn flow_state(&self) -> &FlowState {
136 self.l.flow_state()
137 }
138
139 fn is_top_level() -> bool {
140 false
141 }
142
143 fn multiversioned(&self) -> bool {
144 self.l.multiversioned()
145 }
146}
147
148impl<'a, L> Location<'a> for Tick<L>
149where
150 L: Location<'a>,
151{
152 type Root = L::Root;
153
154 fn root(&self) -> Self::Root {
155 self.l.root()
156 }
157}
158
159impl<'a, L> Tick<L>
160where
161 L: Location<'a>,
162{
163 pub fn outer(&self) -> &L {
168 &self.l
169 }
170
171 pub fn spin_batch(
177 &self,
178 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
179 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
180 where
181 L: NoTick,
182 {
183 let out = self
184 .l
185 .spin()
186 .flat_map_ordered(q!(move |_| 0..batch_size))
187 .map(q!(|_| ()));
188
189 out.batch(self, nondet!())
190 }
191
192 pub fn singleton<T>(
199 &self,
200 e: impl QuotedWithContext<'a, T, Tick<L>>,
201 ) -> Singleton<T, Self, Bounded>
202 where
203 T: Clone,
204 {
205 let e = e.splice_untyped_ctx(self);
206
207 Singleton::new(
208 self.clone(),
209 HydroNode::SingletonSource {
210 value: e.into(),
211 first_tick_only: false,
212 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
213 },
214 )
215 }
216
217 pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
236 let e = q!([]);
237 let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
238
239 let unit_optional: Optional<(), Self, Bounded> = Optional::new(
240 self.clone(),
241 HydroNode::Source {
242 source: HydroSource::Iter(e.into()),
243 metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
244 },
245 );
246
247 unit_optional.map(q!(|_| unreachable!())) }
249
250 pub fn optional_first_tick<T: Clone>(
276 &self,
277 e: impl QuotedWithContext<'a, T, Tick<L>>,
278 ) -> Optional<T, Self, Bounded> {
279 let e = e.splice_untyped_ctx(self);
280
281 Optional::new(
282 self.clone(),
283 HydroNode::SingletonSource {
284 value: e.into(),
285 first_tick_only: true,
286 metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
287 },
288 )
289 }
290
291 #[expect(
300 private_bounds,
301 reason = "only Hydro collections can implement ReceiverComplete"
302 )]
303 pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
304 where
305 S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
306 L: NoTick,
307 {
308 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
309 (
310 TickCycleHandle::new(cycle_id, Location::id(self)),
311 S::create_source(cycle_id, self.clone()).defer_tick(),
312 )
313 }
314
315 #[expect(
322 private_bounds,
323 reason = "only Hydro collections can implement ReceiverComplete"
324 )]
325 pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
326 where
327 S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
328 {
329 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
330 (
331 TickCycleHandle::new(cycle_id, Location::id(self)),
332 S::create_source_with_initial(cycle_id, initial, self.clone()),
334 )
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 #[cfg(feature = "sim")]
341 use stageleft::q;
342
343 #[cfg(feature = "sim")]
344 use crate::live_collections::sliced::sliced;
345 #[cfg(feature = "sim")]
346 use crate::location::Location;
347 #[cfg(feature = "sim")]
348 use crate::nondet::nondet;
349 #[cfg(feature = "sim")]
350 use crate::prelude::FlowBuilder;
351
352 #[cfg(feature = "sim")]
353 #[test]
354 fn sim_atomic_stream() {
355 let mut flow = FlowBuilder::new();
356 let node = flow.process::<()>();
357
358 let (write_send, write_req) = node.sim_input();
359 let (read_send, read_req) = node.sim_input::<(), _, _>();
360
361 let atomic_write = write_req.atomic();
362 let current_state = atomic_write.clone().fold(
363 q!(|| 0),
364 q!(|state: &mut i32, v: i32| {
365 *state += v;
366 }),
367 );
368
369 let write_ack_recv = atomic_write.end_atomic().sim_output();
370 let read_response_recv = sliced! {
371 let batch_of_req = use(read_req, nondet!());
372 let latest_singleton = use::atomic(current_state, nondet!());
373 batch_of_req.cross_singleton(latest_singleton)
374 }
375 .sim_output();
376
377 let sim_compiled = flow.sim().compiled();
378 let instances = sim_compiled.exhaustive(async || {
379 write_send.send(1);
380 write_ack_recv.assert_yields([1]).await;
381 read_send.send(());
382 assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
383 });
384
385 assert_eq!(instances, 1);
386
387 let instances_read_before_write = sim_compiled.exhaustive(async || {
388 write_send.send(1);
389 read_send.send(());
390 write_ack_recv.assert_yields([1]).await;
391 let _ = read_response_recv.next().await;
392 });
393
394 assert_eq!(instances_read_before_write, 3); }
396
397 #[cfg(feature = "sim")]
398 #[test]
399 #[should_panic]
400 fn sim_non_atomic_stream() {
401 let mut flow = FlowBuilder::new();
403 let node = flow.process::<()>();
404
405 let (write_send, write_req) = node.sim_input();
406 let (read_send, read_req) = node.sim_input::<(), _, _>();
407
408 let current_state = write_req.clone().fold(
409 q!(|| 0),
410 q!(|state: &mut i32, v: i32| {
411 *state += v;
412 }),
413 );
414
415 let write_ack_recv = write_req.sim_output();
416
417 let read_response_recv = sliced! {
418 let batch_of_req = use(read_req, nondet!());
419 let latest_singleton = use(current_state, nondet!());
420 batch_of_req.cross_singleton(latest_singleton)
421 }
422 .sim_output();
423
424 flow.sim().exhaustive(async || {
425 write_send.send(1);
426 write_ack_recv.assert_yields([1]).await;
427 read_send.send(());
428
429 if let Some((_, v)) = read_response_recv.next().await {
430 assert_eq!(v, 1);
431 }
432 });
433 }
434
435 #[cfg(feature = "sim")]
436 #[test]
437 fn sim_atomic_input_api() {
438 let mut flow = FlowBuilder::new();
439 let node = flow.process::<()>();
440
441 let (write_send, atomic_write) = node.sim_atomic_input();
442 let (read_send, read_req) = node.sim_input::<(), _, _>();
443
444 let current_state = atomic_write.clone().fold(
445 q!(|| 0),
446 q!(|state: &mut i32, v: i32| {
447 *state += v;
448 }),
449 );
450
451 let read_response_recv = sliced! {
452 let batch_of_req = use(read_req, nondet!());
453 let latest_singleton = use::atomic(current_state, nondet!());
454 batch_of_req.cross_singleton(latest_singleton)
455 }
456 .sim_output();
457
458 let sim_compiled = flow.sim().compiled();
459 let instances = sim_compiled.exhaustive(async || {
460 write_send.send_atomic(1).await;
461 read_send.send(());
462 assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
463 });
464
465 assert_eq!(instances, 1);
466 }
467}