1use core::{fmt, panic};
4use std::cell::{Cell, RefCell};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::context::DfirErased;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::mpsc::UnboundedSender;
22use tokio::sync::{Mutex, Notify};
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31use crate::sim::runtime::SimHook;
32
33struct QuiescenceState {
34 quiescent: Cell<bool>,
36 quiescence_notify: Notify,
38 resume_notify: Notify,
40}
41
42impl QuiescenceState {
43 fn resume(&self) {
45 self.quiescent.set(false);
46 self.resume_notify.notify_waiters();
47 }
48
49 fn is_quiescent(&self) -> bool {
51 self.quiescent.get()
52 }
53
54 fn notified(&self) -> tokio::sync::futures::Notified<'_> {
56 self.quiescence_notify.notified()
57 }
58
59 async fn wait_for_resume(&self) {
61 self.quiescent.set(true);
62 self.quiescence_notify.notify_waiters();
63 self.resume_notify.notified().await;
64 self.quiescent.set(false);
65 }
66}
67
68struct SimConnections {
69 input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
70 output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
71 cluster_input_senders: HashMap<SimExternalPort, Vec<Rc<UnboundedSender<Bytes>>>>,
72 cluster_output_receivers:
73 HashMap<SimExternalPort, Vec<Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
74 external_registered: HashMap<ExternalPortId, SimExternalPort>,
75 quiescence: Rc<QuiescenceState>,
76}
77
78tokio::task_local! {
79 static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
80}
81
82pub struct CompiledSim {
84 pub(super) _path: TempPath,
85 pub(super) lib: Library,
86 pub(super) externals_port_registry: SimExternalPortRegistry,
87 pub(super) unit_test_fuzz_iterations: usize,
88}
89
90#[sealed::sealed]
91pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
95#[sealed::sealed]
96impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
97
98fn null_handler(_args: fmt::Arguments) {}
99
100fn println_handler(args: fmt::Arguments) {
101 println!("{}", args);
102}
103
104fn eprintln_handler(args: fmt::Arguments) {
105 eprintln!("{}", args);
106}
107
108type SimLoaded<'a> = libloading::Symbol<
114 'a,
115 unsafe extern "Rust" fn(
116 should_color: bool,
117 external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
118 external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
119 cluster_external_out: &mut HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>>,
120 cluster_external_in: &mut HashMap<usize, Vec<UnboundedSender<Bytes>>>,
121 println_handler: fn(fmt::Arguments<'_>),
122 eprintln_handler: fn(fmt::Arguments<'_>),
123 ) -> (
124 Vec<(&'static str, Option<u32>, DfirErased)>,
125 Vec<(&'static str, Option<u32>, DfirErased)>,
126 Hooks<&'static str>,
127 InlineHooks<&'static str>,
128 ),
129>;
130
131impl CompiledSim {
132 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
134 self.with_instantiator(|instantiator| thunk(instantiator()), true)
135 }
136
137 pub fn with_instantiator<T>(
145 &self,
146 thunk: impl FnOnce(&dyn Instantiator) -> T,
147 always_log: bool,
148 ) -> T {
149 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
150 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
151 thunk(
152 &(|| CompiledSimInstance {
153 func: func.clone(),
154 externals_port_registry: self.externals_port_registry.clone(),
155 dylib_result: None,
156 log,
157 }),
158 )
159 }
160
161 pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
172 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
173 .elements()
174 .into_iter()
175 .find(|e| {
176 !e.fn_name.starts_with("hydro_lang::sim::compiled")
177 && !e.fn_name.starts_with("hydro_lang::sim::flow")
178 && !e.fn_name.starts_with("fuzz<")
179 && !e.fn_name.starts_with("<hydro_lang::sim")
180 })
181 .unwrap();
182
183 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
184 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
185
186 let caller_fuzz_repro_path = repro_folder
187 .join(caller_fn.fn_name.replace("::", "__"))
188 .with_extension("bin");
189
190 if std::env::var("BOLERO_FUZZER").is_ok() {
191 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
192 std::fs::create_dir_all(&corpus_dir).unwrap();
193 let libfuzzer_args = format!(
194 "{} {} -artifact_prefix={}/ -handle_abrt=0",
195 corpus_dir.to_str().unwrap(),
196 corpus_dir.to_str().unwrap(),
197 corpus_dir.to_str().unwrap(),
198 );
199
200 std::fs::create_dir_all(&repro_folder).unwrap();
201
202 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
203 unsafe {
204 std::env::set_var(
205 "BOLERO_FAILURE_OUTPUT",
206 caller_fuzz_repro_path.to_str().unwrap(),
207 );
208 }
209 }
210
211 unsafe {
212 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
213 }
214
215 self.with_instantiator(
216 |instantiator| {
217 bolero::test(bolero::TargetLocation {
218 package_name: "",
219 manifest_dir: "",
220 module_path: "",
221 file: "",
222 line: 0,
223 item_path: "<unknown>::__bolero_item_path__",
224 test_name: None,
225 })
226 .run_with_replay(move |is_replay| {
227 let mut instance = instantiator();
228
229 if instance.log {
230 eprintln!(
231 "{}",
232 "\n==== New Simulation Instance ===="
233 .color(colored::Color::Cyan)
234 .bold()
235 );
236 }
237
238 if is_replay {
239 instance.log = true;
240 }
241
242 tokio::runtime::Builder::new_current_thread()
243 .build()
244 .unwrap()
245 .block_on(async { instance.run(&mut thunk).await })
246 })
247 },
248 false,
249 );
250 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
251 self.fuzz_repro(existing_bytes, async |compiled| {
252 compiled.launch();
253 thunk().await
254 });
255 } else {
256 eprintln!(
257 "Running a fuzz test without `cargo sim` and no reproducer found at {}, using {} iterations with random inputs.",
258 caller_fuzz_repro_path.display(),
259 self.unit_test_fuzz_iterations,
260 );
261 self.with_instantiator(
262 |instantiator| {
263 bolero::test(bolero::TargetLocation {
264 package_name: "",
265 manifest_dir: "",
266 module_path: "",
267 file: ".",
268 line: 0,
269 item_path: "<unknown>::__bolero_item_path__",
270 test_name: None,
271 })
272 .with_iterations(self.unit_test_fuzz_iterations)
273 .run(move || {
274 let instance = instantiator();
275 tokio::runtime::Builder::new_current_thread()
276 .build()
277 .unwrap()
278 .block_on(async { instance.run(&mut thunk).await })
279 })
280 },
281 false,
282 );
283 }
284 }
285
286 pub fn fuzz_repro<'a>(
290 &'a self,
291 bytes: Vec<u8>,
292 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
293 ) {
294 self.with_instance(|instance| {
295 bolero::bolero_engine::any::scope::with(
296 Box::new(bolero::bolero_engine::driver::object::Object(
297 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
298 )),
299 || {
300 tokio::runtime::Builder::new_current_thread()
301 .build()
302 .unwrap()
303 .block_on(async { instance.run_without_launching(thunk).await })
304 },
305 )
306 });
307 }
308
309 pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
320 if std::env::var("BOLERO_FUZZER").is_ok() {
321 eprintln!(
322 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
323 );
324 std::process::abort();
325 }
326
327 let mut count = 0;
328 let count_mut = &mut count;
329
330 self.with_instantiator(
331 |instantiator| {
332 bolero::test(bolero::TargetLocation {
333 package_name: "",
334 manifest_dir: "",
335 module_path: "",
336 file: "",
337 line: 0,
338 item_path: "<unknown>::__bolero_item_path__",
339 test_name: None,
340 })
341 .exhaustive()
342 .run_with_replay(move |is_replay| {
343 *count_mut += 1;
344
345 let mut instance = instantiator();
346 if instance.log {
347 eprintln!(
348 "{}",
349 "\n==== New Simulation Instance ===="
350 .color(colored::Color::Cyan)
351 .bold()
352 );
353 }
354
355 if is_replay {
356 instance.log = true;
357 }
358
359 tokio::runtime::Builder::new_current_thread()
360 .build()
361 .unwrap()
362 .block_on(async { instance.run(&mut thunk).await })
363 })
364 },
365 false,
366 );
367
368 count
369 }
370}
371
372type DylibResult = (
374 Vec<(&'static str, Option<u32>, DfirErased)>,
375 Vec<(&'static str, Option<u32>, DfirErased)>,
376 Hooks<&'static str>,
377 InlineHooks<&'static str>,
378);
379
380pub struct CompiledSimInstance<'a> {
383 func: SimLoaded<'a>,
384 externals_port_registry: SimExternalPortRegistry,
385 dylib_result: Option<DylibResult>,
386 log: bool,
387}
388
389impl<'a> CompiledSimInstance<'a> {
390 async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
391 self.run_without_launching(async |instance| {
392 instance.launch();
393 thunk().await;
394 })
395 .await;
396 }
397
398 async fn run_without_launching(
399 mut self,
400 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
401 ) {
402 let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
403 let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
404 let mut cluster_external_out: HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>> =
405 HashMap::new();
406 let mut cluster_external_in: HashMap<usize, Vec<UnboundedSender<Bytes>>> = HashMap::new();
407
408 let dylib_result = unsafe {
409 (self.func)(
410 colored::control::SHOULD_COLORIZE.should_colorize(),
411 &mut external_out,
412 &mut external_in,
413 &mut cluster_external_out,
414 &mut cluster_external_in,
415 if self.log {
416 println_handler
417 } else {
418 null_handler
419 },
420 if self.log {
421 eprintln_handler
422 } else {
423 null_handler
424 },
425 )
426 };
427
428 let registered = &self.externals_port_registry.registered;
429
430 let quiescence = Rc::new(QuiescenceState {
431 quiescent: Cell::new(false),
432 quiescence_notify: Notify::new(),
433 resume_notify: Notify::new(),
434 });
435
436 let mut input_senders = HashMap::new();
437 let mut output_receivers = HashMap::new();
438 let mut cluster_input_senders = HashMap::new();
439 let mut cluster_output_receivers = HashMap::new();
440
441 #[expect(
442 clippy::disallowed_methods,
443 reason = "inserts into maps also unordered"
444 )]
445 for sim_port in registered.values() {
446 let usize_key = sim_port.into_inner();
447 if let Some(sender) = external_in.remove(&usize_key) {
448 input_senders.insert(*sim_port, Rc::new(sender));
449 }
450 if let Some(receiver) = external_out.remove(&usize_key) {
451 output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
452 }
453 if let Some(senders) = cluster_external_in.remove(&usize_key) {
454 cluster_input_senders.insert(*sim_port, senders.into_iter().map(Rc::new).collect());
455 }
456 if let Some(receivers) = cluster_external_out.remove(&usize_key) {
457 cluster_output_receivers.insert(
458 *sim_port,
459 receivers
460 .into_iter()
461 .map(|r| Rc::new(Mutex::new(r)))
462 .collect(),
463 );
464 }
465 }
466
467 self.dylib_result = Some(dylib_result);
468
469 let local_set = tokio::task::LocalSet::new();
470 local_set
471 .run_until(CURRENT_SIM_CONNECTIONS.scope(
472 RefCell::new(SimConnections {
473 input_senders,
474 output_receivers,
475 cluster_input_senders,
476 cluster_output_receivers,
477 external_registered: self.externals_port_registry.registered.clone(),
478 quiescence: quiescence.clone(),
479 }),
480 async move {
481 thunk(self).await;
482 },
483 ))
484 .await;
485 }
486
487 fn launch(self) {
490 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
491 }
492
493 pub fn schedule_with_logger<W: std::io::Write>(
496 self,
497 log_writer: W,
498 ) -> impl use<W> + Future<Output = ()> {
499 self.schedule_with_maybe_logger(Some(log_writer))
500 }
501
502 fn schedule_with_maybe_logger<W: std::io::Write>(
503 mut self,
504 log_override: Option<W>,
505 ) -> impl use<W> + Future<Output = ()> {
506 let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
507
508 let not_ready_observation = async_dfirs
509 .iter()
510 .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
511 .collect();
512
513 let quiescence = CURRENT_SIM_CONNECTIONS.with(|connections| {
514 let connections = connections.borrow();
515 connections.quiescence.clone()
516 });
517
518 let mut launched = LaunchedSim {
519 async_dfirs: async_dfirs
520 .into_iter()
521 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
522 .collect(),
523 possibly_ready_ticks: vec![],
524 not_ready_ticks: tick_dfirs
525 .into_iter()
526 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
527 .collect(),
528 possibly_ready_observation: vec![],
529 not_ready_observation,
530 hooks: hooks
531 .into_iter()
532 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
533 .collect(),
534 inline_hooks: inline_hooks
535 .into_iter()
536 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
537 .collect(),
538 log: if self.log {
539 if let Some(w) = log_override {
540 LogKind::Custom(w)
541 } else {
542 LogKind::Stderr
543 }
544 } else {
545 LogKind::Null
546 },
547 quiescence,
548 };
549
550 async move { launched.scheduler().await }
551 }
552}
553
554impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
555 fn clone(&self) -> Self {
556 *self
557 }
558}
559
560impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
561
562impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
563 async fn with_stream<Out>(
564 &self,
565 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
566 ) -> Out {
567 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
568 let connections = connections.borrow();
569 let port = connections.external_registered.get(&self.0).unwrap();
570 (
571 connections.output_receivers.get(port).unwrap().clone(),
572 connections.quiescence.clone(),
573 )
574 });
575
576 let mut receiver_stream = receiver.lock().await;
577 let mut notified_fut = pin!(quiescence.notified());
578 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
579 use std::task::Poll;
580 match receiver_stream.poll_next_unpin(cx) {
581 Poll::Ready(Some(bytes)) => {
582 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
583 }
584 Poll::Ready(None) => return Poll::Ready(None),
585 Poll::Pending => {}
586 }
587 if quiescence.is_quiescent() {
588 return Poll::Ready(None);
589 }
590 let () = ready!(notified_fut.as_mut().poll(cx));
591 notified_fut.set(quiescence.notified());
592 Poll::Ready(None)
593 });
594 thunk(&mut pin!(&mut quiescence_aware)).await
595 }
596
597 pub fn assert_no_more(self) -> impl Future<Output = ()>
599 where
600 T: Debug,
601 {
602 FutureTrackingCaller {
603 future: async move {
604 self.with_stream(async |stream| {
605 if let Some(next) = stream.next().await {
606 return Err(format!(
607 "Stream yielded unexpected message: {:?}, expected termination",
608 next
609 ));
610 }
611 Ok(())
612 })
613 .await
614 },
615 }
616 }
617}
618
619impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
620 pub async fn next(&self) -> Option<T> {
623 self.with_stream(async |stream| stream.next().await).await
624 }
625
626 pub async fn collect<C: Default + Extend<T>>(self) -> C {
629 self.with_stream(async |stream| stream.collect().await)
630 .await
631 }
632
633 pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
636 &self,
637 expected: I,
638 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
639 where
640 T: Debug + PartialEq<T2>,
641 {
642 FutureTrackingCaller {
643 future: async {
644 let mut expected: VecDeque<T2> = expected.into_iter().collect();
645
646 while !expected.is_empty() {
647 if let Some(next) = self.next().await {
648 let next_expected = expected.pop_front().unwrap();
649 if next != next_expected {
650 return Err(format!(
651 "Stream yielded unexpected message: {:?}, expected: {:?}",
652 next, next_expected
653 ));
654 }
655 } else {
656 return Err(format!(
657 "Stream ended early, still expected: {:?}",
658 expected
659 ));
660 }
661 }
662
663 Ok(())
664 },
665 }
666 }
667
668 pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
671 &self,
672 expected: I,
673 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
674 where
675 T: Debug + PartialEq<T2>,
676 {
677 ChainedFuture {
678 first: self.assert_yields(expected),
679 second: self.assert_no_more(),
680 first_done: false,
681 }
682 }
683}
684
685pin_project_lite::pin_project! {
686 struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
696 #[pin]
697 future: F,
698 }
699}
700
701impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
702 type Output = ();
703
704 #[track_caller]
705 fn poll(
706 mut self: Pin<&mut Self>,
707 cx: &mut std::task::Context<'_>,
708 ) -> std::task::Poll<Self::Output> {
709 match ready!(self.as_mut().project().future.poll(cx)) {
710 Ok(()) => std::task::Poll::Ready(()),
711 Err(e) => panic!("{}", e),
712 }
713 }
714}
715
716pin_project_lite::pin_project! {
717 struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
721 #[pin]
722 first: F1,
723 #[pin]
724 second: F2,
725 first_done: bool,
726 }
727}
728
729impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
730 type Output = ();
731
732 #[track_caller]
733 fn poll(
734 mut self: Pin<&mut Self>,
735 cx: &mut std::task::Context<'_>,
736 ) -> std::task::Poll<Self::Output> {
737 if !self.first_done {
738 ready!(self.as_mut().project().first.poll(cx));
739 *self.as_mut().project().first_done = true;
740 }
741
742 self.as_mut().project().second.poll(cx)
743 }
744}
745
746impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
747 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
750 where
751 T: Ord,
752 {
753 self.with_stream(async |stream| {
754 let mut collected: C = stream.collect().await;
755 collected.as_mut().sort();
756 collected
757 })
758 .await
759 }
760
761 pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
764 &self,
765 expected: I,
766 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
767 where
768 T: Debug + PartialEq<T2>,
769 {
770 FutureTrackingCaller {
771 future: async {
772 self.with_stream(async |stream| {
773 let mut expected: Vec<T2> = expected.into_iter().collect();
774
775 while !expected.is_empty() {
776 if let Some(next) = stream.next().await {
777 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
778 if let Some((i, _)) = idx {
779 expected.swap_remove(i);
780 } else {
781 return Err(format!(
782 "Stream yielded unexpected message: {:?}",
783 next
784 ));
785 }
786 } else {
787 return Err(format!(
788 "Stream ended early, still expected: {:?}",
789 expected
790 ));
791 }
792 }
793
794 Ok(())
795 })
796 .await
797 },
798 }
799 }
800
801 pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
804 &self,
805 expected: I,
806 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
807 where
808 T: Debug + PartialEq<T2>,
809 {
810 ChainedFuture {
811 first: self.assert_yields_unordered(expected),
812 second: self.assert_no_more(),
813 first_done: false,
814 }
815 }
816}
817
818impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
819 fn with_sink<Out>(
820 &self,
821 thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
822 ) -> Out {
823 let (sender, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
824 let connections = connections.borrow();
825 (
826 connections
827 .input_senders
828 .get(connections.external_registered.get(&self.0).unwrap())
829 .unwrap()
830 .clone(),
831 connections.quiescence.clone(),
832 )
833 });
834
835 thunk(&move |t| {
836 let res = sender.send(bincode::serialize(&t).unwrap().into());
837 quiescence.resume();
838 res
839 })
840 }
841}
842
843impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
844 pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
847 self.with_sink(|send| {
848 for t in iter {
849 send(t).unwrap();
850 }
851 })
852 }
853}
854
855impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
856 pub fn send(&self, t: T) {
859 self.with_sink(|send| send(t)).unwrap();
860 }
861
862 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
865 self.with_sink(|send| {
866 for t in iter {
867 send(t).unwrap();
868 }
869 })
870 }
871}
872
873impl<T: Serialize + DeserializeOwned> super::SimAtomicSender<T, NoOrder, ExactlyOnce> {
874 pub async fn send_many_unordered_atomic<I: IntoIterator<Item = T>>(&self, iter: I) {
878 let mut count = 0usize;
879 self.0
880 .send_many_unordered(iter.into_iter().inspect(|_| count += 1));
881 self.1.assert_yields_unordered(vec![(); count]).await;
882 }
883}
884
885impl<T: Serialize + DeserializeOwned> super::SimAtomicSender<T, TotalOrder, ExactlyOnce> {
886 pub async fn send_atomic(&self, t: T) {
889 self.0.send(t);
890 self.1
891 .next()
892 .await
893 .expect("atomic ack stream ended unexpectedly");
894 }
895
896 pub async fn send_many_atomic<I: IntoIterator<Item = T>>(&self, iter: I) {
899 let mut count = 0usize;
900 self.0.send_many(iter.into_iter().inspect(|_| count += 1));
901 self.1.assert_yields(vec![(); count]).await;
902 }
903}
904
905impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
906 for SimClusterReceiver<T, O, R>
907{
908 fn clone(&self) -> Self {
909 *self
910 }
911}
912
913impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
914 for SimClusterReceiver<T, O, R>
915{
916}
917
918impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
919 async fn with_member_stream<Out>(
920 &self,
921 member_id: u32,
922 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
923 ) -> Out {
924 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
925 let connections = connections.borrow();
926 let port = connections.external_registered.get(&self.0).unwrap();
927 let receivers = connections.cluster_output_receivers.get(port).unwrap();
928 (
929 receivers[member_id as usize].clone(),
930 connections.quiescence.clone(),
931 )
932 });
933
934 let mut lock = receiver.lock().await;
935 let mut notified_fut = pin!(quiescence.notified());
936 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
937 use std::task::Poll;
938 match lock.poll_next_unpin(cx) {
939 Poll::Ready(Some(bytes)) => {
940 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
941 }
942 Poll::Ready(None) => return Poll::Ready(None),
943 Poll::Pending => {}
944 }
945 if quiescence.is_quiescent() {
946 return Poll::Ready(None);
947 }
948 let () = ready!(notified_fut.as_mut().poll(cx));
949 notified_fut.set(quiescence.notified());
950 Poll::Ready(None)
951 });
952 thunk(&mut pin!(&mut quiescence_aware)).await
953 }
954}
955
956impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
957 pub async fn next(&self, member_id: u32) -> Option<T> {
959 self.with_member_stream(member_id, async |stream| stream.next().await)
960 .await
961 }
962
963 pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
965 self.with_member_stream(member_id, async |stream| stream.collect().await)
966 .await
967 }
968}
969
970impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
971 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
973 where
974 T: Ord,
975 {
976 self.with_member_stream(member_id, async |stream| {
977 let mut collected: C = stream.collect().await;
978 collected.as_mut().sort();
979 collected
980 })
981 .await
982 }
983}
984
985impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
986 fn with_sink<Out>(
987 &self,
988 thunk: impl FnOnce(
989 &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
990 ) -> Out,
991 ) -> Out {
992 let (senders, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
993 let connections = connections.borrow();
994 (
995 connections
996 .cluster_input_senders
997 .get(connections.external_registered.get(&self.0).unwrap())
998 .unwrap()
999 .clone(),
1000 connections.quiescence.clone(),
1001 )
1002 });
1003
1004 thunk(&move |member_id: u32, t: T| {
1005 let payload = bincode::serialize(&t).unwrap();
1006 let res = senders[member_id as usize].send(Bytes::from(payload));
1007 quiescence.resume();
1008 res
1009 })
1010 }
1011}
1012
1013impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
1014 pub fn send(&self, member_id: u32, t: T) {
1016 self.with_sink(|send| send(member_id, t)).unwrap();
1017 }
1018
1019 pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
1021 self.with_sink(|send| {
1022 for (member_id, t) in iter {
1023 send(member_id, t).unwrap();
1024 }
1025 })
1026 }
1027}
1028
1029enum LogKind<W: std::io::Write> {
1030 Null,
1031 Stderr,
1032 Custom(W),
1033}
1034
1035impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
1037 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
1038 match self {
1039 LogKind::Null => Ok(()),
1040 LogKind::Stderr => {
1041 eprint!("{}", s);
1042 Ok(())
1043 }
1044 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
1045 }
1046 }
1047}
1048
1049struct LaunchedSim<W: std::io::Write> {
1061 async_dfirs: Vec<(LocationId, Option<u32>, DfirErased)>,
1064 possibly_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1067 not_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1069 possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
1073 not_ready_observation: Vec<(LocationId, Option<u32>)>,
1075 hooks: Hooks<LocationId>,
1078 inline_hooks: InlineHooks<LocationId>,
1082 log: LogKind<W>,
1083 quiescence: Rc<QuiescenceState>,
1085}
1086
1087impl<W: std::io::Write> LaunchedSim<W> {
1088 async fn scheduler(&mut self) {
1089 loop {
1090 tokio::task::yield_now().await;
1091 let mut any_made_progress = false;
1092 for (loc, c_id, dfir) in &mut self.async_dfirs {
1093 if dfir.run_tick().await {
1094 any_made_progress = true;
1095 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
1096 .not_ready_ticks
1097 .drain(..)
1098 .partition(|(tick_loc, tick_c_id, _)| {
1099 let LocationId::Tick(_, outer) = tick_loc else {
1100 unreachable!()
1101 };
1102 outer.as_ref() == loc && tick_c_id == c_id
1103 });
1104
1105 self.possibly_ready_ticks.extend(now_ready);
1106 self.not_ready_ticks.extend(still_not_ready);
1107
1108 let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
1109 .not_ready_observation
1110 .drain(..)
1111 .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
1112
1113 self.possibly_ready_observation.extend(now_ready_obs);
1114 self.not_ready_observation.extend(still_not_ready_obs);
1115 }
1116 }
1117
1118 if any_made_progress {
1119 continue;
1120 } else {
1121 use bolero::generator::*;
1122
1123 let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
1124 .possibly_ready_ticks
1125 .drain(..)
1126 .partition(|(name, cid, _)| {
1127 self.hooks
1128 .get(&(name.clone(), *cid))
1129 .unwrap()
1130 .iter()
1131 .any(|hook| {
1132 hook.current_decision().unwrap_or(false)
1133 || hook.can_make_nontrivial_decision()
1134 })
1135 });
1136
1137 self.possibly_ready_ticks = ready_tick;
1138 self.not_ready_ticks.append(&mut not_ready_tick);
1139
1140 let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
1141 .possibly_ready_observation
1142 .drain(..)
1143 .partition(|(name, cid)| {
1144 self.hooks
1145 .get(&(name.clone(), *cid))
1146 .into_iter()
1147 .flatten()
1148 .any(|hook| {
1149 hook.current_decision().unwrap_or(false)
1150 || hook.can_make_nontrivial_decision()
1151 })
1152 });
1153
1154 self.possibly_ready_observation = ready_obs;
1155 self.not_ready_observation.append(&mut not_ready_obs);
1156
1157 if self.possibly_ready_ticks.is_empty()
1158 && self.possibly_ready_observation.is_empty()
1159 {
1160 self.quiescence.wait_for_resume().await;
1162 } else {
1163 let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1164 + self.possibly_ready_observation.len()))
1165 .any();
1166
1167 if next_tick_or_obs < self.possibly_ready_ticks.len() {
1168 let next_tick = next_tick_or_obs;
1169 let mut removed = self.possibly_ready_ticks.remove(next_tick);
1170
1171 match &mut self.log {
1172 LogKind::Null => {}
1173 LogKind::Stderr => {
1174 if let Some(cid) = &removed.1 {
1175 eprintln!(
1176 "\n{}",
1177 format!("Running Tick (Cluster Member {})", cid)
1178 .color(colored::Color::Magenta)
1179 .bold()
1180 )
1181 } else {
1182 eprintln!(
1183 "\n{}",
1184 "Running Tick".color(colored::Color::Magenta).bold()
1185 )
1186 }
1187 }
1188 LogKind::Custom(writer) => {
1189 writeln!(
1190 writer,
1191 "\n{}",
1192 "Running Tick".color(colored::Color::Magenta).bold()
1193 )
1194 .unwrap();
1195 }
1196 }
1197
1198 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1199 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1200 write.write_str(" ")
1201 };
1202
1203 let mut tick_decision_writer = indenter::indented(&mut self.log)
1204 .with_format(indenter::Format::Custom {
1205 inserter: &mut asterisk_indenter,
1206 });
1207
1208 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1209 run_hooks(&mut tick_decision_writer, hooks);
1210
1211 let run_tick_future = removed.2.run_tick();
1212 if let Some(inline_hooks) =
1213 self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1214 {
1215 let mut run_tick_future_pinned = pin!(run_tick_future);
1216
1217 loop {
1218 tokio::select! {
1219 biased;
1220 r = &mut run_tick_future_pinned => {
1221 assert!(r);
1222 break;
1223 }
1224 _ = async {} => {
1225 bolero_generator::any::scope::borrow_with(|driver| {
1226 for hook in inline_hooks.iter_mut() {
1227 if hook.pending_decision() {
1228 if !hook.has_decision() {
1229 hook.autonomous_decision(driver);
1230 }
1231
1232 hook.release_decision(&mut tick_decision_writer);
1233 }
1234 }
1235 });
1236 }
1237 }
1238 }
1239 } else {
1240 assert!(run_tick_future.await);
1241 }
1242
1243 self.possibly_ready_ticks.push(removed);
1244 } else {
1245 let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1246 let mut default_hooks = vec![];
1247 let hooks = self
1248 .hooks
1249 .get_mut(&self.possibly_ready_observation[next_obs])
1250 .unwrap_or(&mut default_hooks);
1251
1252 run_hooks(&mut self.log, hooks);
1253 }
1254 }
1255 }
1256 }
1257 }
1258}
1259
1260fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1261 let mut remaining_decision_count = hooks.len();
1262 let mut made_nontrivial_decision = false;
1263
1264 bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1265 hooks.iter_mut().for_each(|hook| {
1267 if let Some(is_nontrivial) = hook.current_decision() {
1268 made_nontrivial_decision |= is_nontrivial;
1269 remaining_decision_count -= 1;
1270 } else if !hook.can_make_nontrivial_decision() {
1271 hook.autonomous_decision(driver, false);
1275 remaining_decision_count -= 1;
1276 }
1277 });
1278
1279 hooks.iter_mut().for_each(|hook| {
1280 if hook.current_decision().is_none() {
1281 made_nontrivial_decision |= hook.autonomous_decision(
1282 driver,
1283 !made_nontrivial_decision && remaining_decision_count == 1,
1284 );
1285 remaining_decision_count -= 1;
1286 }
1287
1288 hook.release_decision(tick_decision_writer);
1289 });
1290 });
1291}