1#![warn(missing_docs)]
2
3extern crate proc_macro;
4
5use std::collections::{BTreeMap, BTreeSet};
6use std::fmt::Debug;
7use std::iter::FusedIterator;
8
9use itertools::Itertools;
10use proc_macro2::{Ident, Literal, Span, TokenStream};
11use quote::{ToTokens, format_ident, quote, quote_spanned};
12use serde::{Deserialize, Serialize};
13use slotmap::{Key, SecondaryMap, SlotMap, SparseSecondaryMap};
14use syn::spanned::Spanned;
15
16use super::graph_write::{Dot, GraphWrite, Mermaid};
17use super::ops::{
18 DelayType, OPERATORS, OperatorWriteOutput, WriteContextArgs, find_op_op_constraints,
19 null_write_iterator_fn,
20};
21use super::{
22 CONTEXT, Color, DiMulGraph, GRAPH, GraphEdgeId, GraphLoopId, GraphNode, GraphNodeId,
23 GraphSubgraphId, HANDOFF_NODE_STR, MODULE_BOUNDARY_NODE_STR, OperatorInstance, PortIndexValue,
24 Varname, change_spans, get_operator_generics,
25};
26use crate::diagnostic::{Diagnostic, Diagnostics, Level};
27use crate::pretty_span::{PrettyRowCol, PrettySpan};
28use crate::process_singletons;
29
30#[derive(Default, Debug, Serialize, Deserialize)]
40pub struct DfirGraph {
41 nodes: SlotMap<GraphNodeId, GraphNode>,
43
44 #[serde(skip)]
47 operator_instances: SecondaryMap<GraphNodeId, OperatorInstance>,
48 operator_tag: SecondaryMap<GraphNodeId, String>,
50 graph: DiMulGraph<GraphNodeId, GraphEdgeId>,
52 ports: SecondaryMap<GraphEdgeId, (PortIndexValue, PortIndexValue)>,
54
55 node_loops: SecondaryMap<GraphNodeId, GraphLoopId>,
57 loop_nodes: SlotMap<GraphLoopId, Vec<GraphNodeId>>,
59 loop_parent: SparseSecondaryMap<GraphLoopId, GraphLoopId>,
61 root_loops: Vec<GraphLoopId>,
63 loop_children: SecondaryMap<GraphLoopId, Vec<GraphLoopId>>,
65
66 node_subgraph: SecondaryMap<GraphNodeId, GraphSubgraphId>,
68
69 subgraph_nodes: SlotMap<GraphSubgraphId, Vec<GraphNodeId>>,
71
72 node_singleton_references: SparseSecondaryMap<GraphNodeId, Vec<Option<GraphNodeId>>>,
74 node_varnames: SparseSecondaryMap<GraphNodeId, Varname>,
76
77 handoff_delay_type: SparseSecondaryMap<GraphNodeId, DelayType>,
81}
82
83impl DfirGraph {
85 pub fn new() -> Self {
87 Default::default()
88 }
89}
90
91impl DfirGraph {
93 pub fn node(&self, node_id: GraphNodeId) -> &GraphNode {
95 self.nodes.get(node_id).expect("Node not found.")
96 }
97
98 pub fn node_op_inst(&self, node_id: GraphNodeId) -> Option<&OperatorInstance> {
103 self.operator_instances.get(node_id)
104 }
105
106 pub fn node_varname(&self, node_id: GraphNodeId) -> Option<&Varname> {
108 self.node_varnames.get(node_id)
109 }
110
111 pub fn node_subgraph(&self, node_id: GraphNodeId) -> Option<GraphSubgraphId> {
113 self.node_subgraph.get(node_id).copied()
114 }
115
116 pub fn node_degree_in(&self, node_id: GraphNodeId) -> usize {
118 self.graph.degree_in(node_id)
119 }
120
121 pub fn node_degree_out(&self, node_id: GraphNodeId) -> usize {
123 self.graph.degree_out(node_id)
124 }
125
126 pub fn node_successors(
128 &self,
129 src: GraphNodeId,
130 ) -> impl '_
131 + DoubleEndedIterator<Item = (GraphEdgeId, GraphNodeId)>
132 + ExactSizeIterator
133 + FusedIterator
134 + Clone
135 + Debug {
136 self.graph.successors(src)
137 }
138
139 pub fn node_predecessors(
141 &self,
142 dst: GraphNodeId,
143 ) -> impl '_
144 + DoubleEndedIterator<Item = (GraphEdgeId, GraphNodeId)>
145 + ExactSizeIterator
146 + FusedIterator
147 + Clone
148 + Debug {
149 self.graph.predecessors(dst)
150 }
151
152 pub fn node_successor_edges(
154 &self,
155 src: GraphNodeId,
156 ) -> impl '_
157 + DoubleEndedIterator<Item = GraphEdgeId>
158 + ExactSizeIterator
159 + FusedIterator
160 + Clone
161 + Debug {
162 self.graph.successor_edges(src)
163 }
164
165 pub fn node_predecessor_edges(
167 &self,
168 dst: GraphNodeId,
169 ) -> impl '_
170 + DoubleEndedIterator<Item = GraphEdgeId>
171 + ExactSizeIterator
172 + FusedIterator
173 + Clone
174 + Debug {
175 self.graph.predecessor_edges(dst)
176 }
177
178 pub fn node_successor_nodes(
180 &self,
181 src: GraphNodeId,
182 ) -> impl '_
183 + DoubleEndedIterator<Item = GraphNodeId>
184 + ExactSizeIterator
185 + FusedIterator
186 + Clone
187 + Debug {
188 self.graph.successor_vertices(src)
189 }
190
191 pub fn node_predecessor_nodes(
193 &self,
194 dst: GraphNodeId,
195 ) -> impl '_
196 + DoubleEndedIterator<Item = GraphNodeId>
197 + ExactSizeIterator
198 + FusedIterator
199 + Clone
200 + Debug {
201 self.graph.predecessor_vertices(dst)
202 }
203
204 pub fn node_ids(&self) -> slotmap::basic::Keys<'_, GraphNodeId, GraphNode> {
206 self.nodes.keys()
207 }
208
209 pub fn nodes(&self) -> slotmap::basic::Iter<'_, GraphNodeId, GraphNode> {
211 self.nodes.iter()
212 }
213
214 pub fn insert_node(
216 &mut self,
217 node: GraphNode,
218 varname_opt: Option<Ident>,
219 loop_opt: Option<GraphLoopId>,
220 ) -> GraphNodeId {
221 let node_id = self.nodes.insert(node);
222 if let Some(varname) = varname_opt {
223 self.node_varnames.insert(node_id, Varname(varname));
224 }
225 if let Some(loop_id) = loop_opt {
226 self.node_loops.insert(node_id, loop_id);
227 self.loop_nodes[loop_id].push(node_id);
228 }
229 node_id
230 }
231
232 pub fn insert_node_op_inst(&mut self, node_id: GraphNodeId, op_inst: OperatorInstance) {
234 assert!(matches!(
235 self.nodes.get(node_id),
236 Some(GraphNode::Operator(_))
237 ));
238 let old_inst = self.operator_instances.insert(node_id, op_inst);
239 assert!(old_inst.is_none());
240 }
241
242 pub fn insert_node_op_insts_all(&mut self, diagnostics: &mut Diagnostics) {
244 let mut op_insts = Vec::new();
245 for (node_id, node) in self.nodes() {
246 let GraphNode::Operator(operator) = node else {
247 continue;
248 };
249 if self.node_op_inst(node_id).is_some() {
250 continue;
251 };
252
253 let Some(op_constraints) = find_op_op_constraints(operator) else {
255 diagnostics.push(Diagnostic::spanned(
256 operator.path.span(),
257 Level::Error,
258 format!("Unknown operator `{}`", operator.name_string()),
259 ));
260 continue;
261 };
262
263 let (input_ports, output_ports) = {
265 let mut input_edges: Vec<(&PortIndexValue, GraphNodeId)> = self
266 .node_predecessors(node_id)
267 .map(|(edge_id, pred_id)| (self.edge_ports(edge_id).1, pred_id))
268 .collect();
269 input_edges.sort();
271 let input_ports: Vec<PortIndexValue> = input_edges
272 .into_iter()
273 .map(|(port, _pred)| port)
274 .cloned()
275 .collect();
276
277 let mut output_edges: Vec<(&PortIndexValue, GraphNodeId)> = self
279 .node_successors(node_id)
280 .map(|(edge_id, succ)| (self.edge_ports(edge_id).0, succ))
281 .collect();
282 output_edges.sort();
284 let output_ports: Vec<PortIndexValue> = output_edges
285 .into_iter()
286 .map(|(port, _succ)| port)
287 .cloned()
288 .collect();
289
290 (input_ports, output_ports)
291 };
292
293 let generics = get_operator_generics(diagnostics, operator);
295 {
297 let generics_span = generics
299 .generic_args
300 .as_ref()
301 .map(Spanned::span)
302 .unwrap_or_else(|| operator.path.span());
303
304 if !op_constraints
305 .persistence_args
306 .contains(&generics.persistence_args.len())
307 {
308 diagnostics.push(Diagnostic::spanned(
309 generics.persistence_args_span().unwrap_or(generics_span),
310 Level::Error,
311 format!(
312 "`{}` should have {} persistence lifetime arguments, actually has {}.",
313 op_constraints.name,
314 op_constraints.persistence_args.human_string(),
315 generics.persistence_args.len()
316 ),
317 ));
318 }
319 if !op_constraints.type_args.contains(&generics.type_args.len()) {
320 diagnostics.push(Diagnostic::spanned(
321 generics.type_args_span().unwrap_or(generics_span),
322 Level::Error,
323 format!(
324 "`{}` should have {} generic type arguments, actually has {}.",
325 op_constraints.name,
326 op_constraints.type_args.human_string(),
327 generics.type_args.len()
328 ),
329 ));
330 }
331 }
332
333 op_insts.push((
334 node_id,
335 OperatorInstance {
336 op_constraints,
337 input_ports,
338 output_ports,
339 singletons_referenced: operator.singletons_referenced.clone(),
340 generics,
341 arguments_pre: operator.args.clone(),
342 arguments_raw: operator.args_raw.clone(),
343 },
344 ));
345 }
346
347 for (node_id, op_inst) in op_insts {
348 self.insert_node_op_inst(node_id, op_inst);
349 }
350 }
351
352 pub fn insert_intermediate_node(
364 &mut self,
365 edge_id: GraphEdgeId,
366 new_node: GraphNode,
367 ) -> (GraphNodeId, GraphEdgeId) {
368 let span = Some(new_node.span());
369
370 let op_inst_opt = 'oc: {
372 let GraphNode::Operator(operator) = &new_node else {
373 break 'oc None;
374 };
375 let Some(op_constraints) = find_op_op_constraints(operator) else {
376 break 'oc None;
377 };
378 let (input_port, output_port) = self.ports.get(edge_id).cloned().unwrap();
379
380 let mut dummy_diagnostics = Diagnostics::new();
381 let generics = get_operator_generics(&mut dummy_diagnostics, operator);
382 assert!(dummy_diagnostics.is_empty());
383
384 Some(OperatorInstance {
385 op_constraints,
386 input_ports: vec![input_port],
387 output_ports: vec![output_port],
388 singletons_referenced: operator.singletons_referenced.clone(),
389 generics,
390 arguments_pre: operator.args.clone(),
391 arguments_raw: operator.args_raw.clone(),
392 })
393 };
394
395 let node_id = self.nodes.insert(new_node);
397 if let Some(op_inst) = op_inst_opt {
399 self.operator_instances.insert(node_id, op_inst);
400 }
401 let (e0, e1) = self
403 .graph
404 .insert_intermediate_vertex(node_id, edge_id)
405 .unwrap();
406
407 let (src_idx, dst_idx) = self.ports.remove(edge_id).unwrap();
409 self.ports
410 .insert(e0, (src_idx, PortIndexValue::Elided(span)));
411 self.ports
412 .insert(e1, (PortIndexValue::Elided(span), dst_idx));
413
414 (node_id, e1)
415 }
416
417 pub fn remove_intermediate_node(&mut self, node_id: GraphNodeId) {
420 assert_eq!(
421 1,
422 self.node_degree_in(node_id),
423 "Removed intermediate node must have one predecessor"
424 );
425 assert_eq!(
426 1,
427 self.node_degree_out(node_id),
428 "Removed intermediate node must have one successor"
429 );
430 assert!(
431 self.node_subgraph.is_empty() && self.subgraph_nodes.is_empty(),
432 "Should not remove intermediate node after subgraph partitioning"
433 );
434
435 assert!(self.nodes.remove(node_id).is_some());
436 let (new_edge_id, (pred_edge_id, succ_edge_id)) =
437 self.graph.remove_intermediate_vertex(node_id).unwrap();
438 self.operator_instances.remove(node_id);
439 self.node_varnames.remove(node_id);
440
441 let (src_port, _) = self.ports.remove(pred_edge_id).unwrap();
442 let (_, dst_port) = self.ports.remove(succ_edge_id).unwrap();
443 self.ports.insert(new_edge_id, (src_port, dst_port));
444 }
445
446 pub(crate) fn node_color(&self, node_id: GraphNodeId) -> Option<Color> {
452 if matches!(self.node(node_id), GraphNode::Handoff { .. }) {
453 return Some(Color::Hoff);
454 }
455
456 if let GraphNode::Operator(op) = self.node(node_id)
458 && (op.name_string() == "resolve_futures_blocking"
459 || op.name_string() == "resolve_futures_blocking_ordered")
460 {
461 return Some(Color::Push);
462 }
463
464 let inn_degree = self.node_predecessor_nodes(node_id).len();
466 let out_degree = self.node_successor_nodes(node_id).len();
468
469 match (inn_degree, out_degree) {
470 (0, 0) => None, (0, 1) => Some(Color::Pull),
472 (1, 0) => Some(Color::Push),
473 (1, 1) => None, (_many, 0 | 1) => Some(Color::Pull),
475 (0 | 1, _many) => Some(Color::Push),
476 (_many, _to_many) => Some(Color::Comp),
477 }
478 }
479
480 pub fn set_operator_tag(&mut self, node_id: GraphNodeId, tag: String) {
482 self.operator_tag.insert(node_id, tag);
483 }
484}
485
486impl DfirGraph {
488 pub fn set_node_singleton_references(
491 &mut self,
492 node_id: GraphNodeId,
493 singletons_referenced: Vec<Option<GraphNodeId>>,
494 ) -> Option<Vec<Option<GraphNodeId>>> {
495 self.node_singleton_references
496 .insert(node_id, singletons_referenced)
497 }
498
499 pub fn node_singleton_references(&self, node_id: GraphNodeId) -> &[Option<GraphNodeId>] {
502 self.node_singleton_references
503 .get(node_id)
504 .map(std::ops::Deref::deref)
505 .unwrap_or_default()
506 }
507}
508
509impl DfirGraph {
511 pub fn merge_modules(&mut self) -> Result<(), Diagnostic> {
519 let mod_bound_nodes = self
520 .nodes()
521 .filter(|(_nid, node)| matches!(node, GraphNode::ModuleBoundary { .. }))
522 .map(|(nid, _node)| nid)
523 .collect::<Vec<_>>();
524
525 for mod_bound_node in mod_bound_nodes {
526 self.remove_module_boundary(mod_bound_node)?;
527 }
528
529 Ok(())
530 }
531
532 fn remove_module_boundary(&mut self, mod_bound_node: GraphNodeId) -> Result<(), Diagnostic> {
536 assert!(
537 self.node_subgraph.is_empty() && self.subgraph_nodes.is_empty(),
538 "Should not remove intermediate node after subgraph partitioning"
539 );
540
541 let mut mod_pred_ports = BTreeMap::new();
542 let mut mod_succ_ports = BTreeMap::new();
543
544 for mod_out_edge in self.node_predecessor_edges(mod_bound_node) {
545 let (pred_port, succ_port) = self.edge_ports(mod_out_edge);
546 mod_pred_ports.insert(succ_port.clone(), (mod_out_edge, pred_port.clone()));
547 }
548
549 for mod_inn_edge in self.node_successor_edges(mod_bound_node) {
550 let (pred_port, succ_port) = self.edge_ports(mod_inn_edge);
551 mod_succ_ports.insert(pred_port.clone(), (mod_inn_edge, succ_port.clone()));
552 }
553
554 if mod_pred_ports.keys().collect::<BTreeSet<_>>()
555 != mod_succ_ports.keys().collect::<BTreeSet<_>>()
556 {
557 let GraphNode::ModuleBoundary { input, import_expr } = self.node(mod_bound_node) else {
559 panic!();
560 };
561
562 if *input {
563 return Err(Diagnostic {
564 span: *import_expr,
565 level: Level::Error,
566 message: format!(
567 "The ports into the module did not match. input: {:?}, expected: {:?}",
568 mod_pred_ports.keys().map(|x| x.to_string()).join(", "),
569 mod_succ_ports.keys().map(|x| x.to_string()).join(", ")
570 ),
571 });
572 } else {
573 return Err(Diagnostic {
574 span: *import_expr,
575 level: Level::Error,
576 message: format!(
577 "The ports out of the module did not match. output: {:?}, expected: {:?}",
578 mod_succ_ports.keys().map(|x| x.to_string()).join(", "),
579 mod_pred_ports.keys().map(|x| x.to_string()).join(", "),
580 ),
581 });
582 }
583 }
584
585 for (port, (pred_edge, pred_port)) in mod_pred_ports {
586 let (succ_edge, succ_port) = mod_succ_ports.remove(&port).unwrap();
587
588 let (src, _) = self.edge(pred_edge);
589 let (_, dst) = self.edge(succ_edge);
590 self.remove_edge(pred_edge);
591 self.remove_edge(succ_edge);
592
593 let new_edge_id = self.graph.insert_edge(src, dst);
594 self.ports.insert(new_edge_id, (pred_port, succ_port));
595 }
596
597 self.graph.remove_vertex(mod_bound_node);
598 self.nodes.remove(mod_bound_node);
599
600 Ok(())
601 }
602}
603
604impl DfirGraph {
606 pub fn edge(&self, edge_id: GraphEdgeId) -> (GraphNodeId, GraphNodeId) {
608 let (src, dst) = self.graph.edge(edge_id).expect("Edge not found.");
609 (src, dst)
610 }
611
612 pub fn edge_ports(&self, edge_id: GraphEdgeId) -> (&PortIndexValue, &PortIndexValue) {
614 let (src_port, dst_port) = self.ports.get(edge_id).expect("Edge not found.");
615 (src_port, dst_port)
616 }
617
618 pub fn edge_ids(&self) -> slotmap::basic::Keys<'_, GraphEdgeId, (GraphNodeId, GraphNodeId)> {
620 self.graph.edge_ids()
621 }
622
623 pub fn edges(
625 &self,
626 ) -> impl '_
627 + ExactSizeIterator<Item = (GraphEdgeId, (GraphNodeId, GraphNodeId))>
628 + FusedIterator
629 + Clone
630 + Debug {
631 self.graph.edges()
632 }
633
634 pub fn insert_edge(
636 &mut self,
637 src: GraphNodeId,
638 src_port: PortIndexValue,
639 dst: GraphNodeId,
640 dst_port: PortIndexValue,
641 ) -> GraphEdgeId {
642 let edge_id = self.graph.insert_edge(src, dst);
643 self.ports.insert(edge_id, (src_port, dst_port));
644 edge_id
645 }
646
647 pub fn remove_edge(&mut self, edge: GraphEdgeId) {
649 let (_src, _dst) = self.graph.remove_edge(edge).unwrap();
650 let (_src_port, _dst_port) = self.ports.remove(edge).unwrap();
651 }
652}
653
654impl DfirGraph {
656 pub fn subgraph(&self, subgraph_id: GraphSubgraphId) -> &Vec<GraphNodeId> {
658 self.subgraph_nodes
659 .get(subgraph_id)
660 .expect("Subgraph not found.")
661 }
662
663 pub fn subgraph_ids(&self) -> slotmap::basic::Keys<'_, GraphSubgraphId, Vec<GraphNodeId>> {
665 self.subgraph_nodes.keys()
666 }
667
668 pub fn subgraphs(&self) -> slotmap::basic::Iter<'_, GraphSubgraphId, Vec<GraphNodeId>> {
670 self.subgraph_nodes.iter()
671 }
672
673 pub fn insert_subgraph(
675 &mut self,
676 node_ids: Vec<GraphNodeId>,
677 ) -> Result<GraphSubgraphId, (GraphNodeId, GraphSubgraphId)> {
678 for &node_id in node_ids.iter() {
680 if let Some(&old_sg_id) = self.node_subgraph.get(node_id) {
681 return Err((node_id, old_sg_id));
682 }
683 }
684 let subgraph_id = self.subgraph_nodes.insert_with_key(|sg_id| {
685 for &node_id in node_ids.iter() {
686 self.node_subgraph.insert(node_id, sg_id);
687 }
688 node_ids
689 });
690
691 Ok(subgraph_id)
692 }
693
694 pub fn remove_from_subgraph(&mut self, node_id: GraphNodeId) -> bool {
696 if let Some(old_sg_id) = self.node_subgraph.remove(node_id) {
697 self.subgraph_nodes[old_sg_id].retain(|&other_node_id| other_node_id != node_id);
698 true
699 } else {
700 false
701 }
702 }
703
704 pub fn handoff_delay_type(&self, node_id: GraphNodeId) -> Option<DelayType> {
706 self.handoff_delay_type.get(node_id).copied()
707 }
708
709 pub fn set_handoff_delay_type(&mut self, node_id: GraphNodeId, delay_type: DelayType) {
711 self.handoff_delay_type.insert(node_id, delay_type);
712 }
713
714 fn find_pull_to_push_idx(&self, subgraph_nodes: &[GraphNodeId]) -> usize {
716 subgraph_nodes
717 .iter()
718 .position(|&node_id| {
719 self.node_color(node_id)
720 .is_some_and(|color| Color::Pull != color)
721 })
722 .unwrap_or(subgraph_nodes.len())
723 }
724}
725
726impl DfirGraph {
728 fn node_as_ident(&self, node_id: GraphNodeId, is_pred: bool) -> Ident {
730 let name = match &self.nodes[node_id] {
731 GraphNode::Operator(_) => format!("op_{:?}", node_id.data()),
732 GraphNode::Handoff { .. } => format!(
733 "hoff_{:?}_{}",
734 node_id.data(),
735 if is_pred { "recv" } else { "send" }
736 ),
737 GraphNode::ModuleBoundary { .. } => panic!(),
738 };
739 let span = match (is_pred, &self.nodes[node_id]) {
740 (_, GraphNode::Operator(operator)) => operator.span(),
741 (true, &GraphNode::Handoff { src_span, .. }) => src_span,
742 (false, &GraphNode::Handoff { dst_span, .. }) => dst_span,
743 (_, GraphNode::ModuleBoundary { .. }) => panic!(),
744 };
745 Ident::new(&name, span)
746 }
747
748 fn hoff_buf_ident(&self, hoff_id: GraphNodeId, span: Span) -> Ident {
750 Ident::new(&format!("hoff_{:?}_buf", hoff_id.data()), span)
751 }
752
753 fn hoff_back_ident(&self, hoff_id: GraphNodeId, span: Span) -> Ident {
755 Ident::new(&format!("hoff_{:?}_back", hoff_id.data()), span)
756 }
757
758 fn node_as_singleton_ident(&self, node_id: GraphNodeId, span: Span) -> Ident {
760 Ident::new(&format!("singleton_op_{:?}", node_id.data()), span)
761 }
762
763 fn helper_resolve_singletons(&self, node_id: GraphNodeId, span: Span) -> Vec<Ident> {
765 self.node_singleton_references(node_id)
766 .iter()
767 .map(|singleton_node_id| {
768 self.node_as_singleton_ident(
770 singleton_node_id
771 .expect("Expected singleton to be resolved but was not, this is a bug."),
772 span,
773 )
774 })
775 .collect::<Vec<_>>()
776 }
777
778 fn helper_collect_subgraph_handoffs(
781 &self,
782 ) -> SecondaryMap<GraphSubgraphId, (Vec<GraphNodeId>, Vec<GraphNodeId>)> {
783 let mut subgraph_handoffs: SecondaryMap<
785 GraphSubgraphId,
786 (Vec<GraphNodeId>, Vec<GraphNodeId>),
787 > = self
788 .subgraph_nodes
789 .keys()
790 .map(|k| (k, Default::default()))
791 .collect();
792
793 for (hoff_id, node) in self.nodes() {
795 if !matches!(node, GraphNode::Handoff { .. }) {
796 continue;
797 }
798 for (_edge, succ_id) in self.node_successors(hoff_id) {
800 let succ_sg = self.node_subgraph(succ_id).unwrap();
801 subgraph_handoffs[succ_sg].0.push(hoff_id);
802 }
803 for (_edge, pred_id) in self.node_predecessors(hoff_id) {
805 let pred_sg = self.node_subgraph(pred_id).unwrap();
806 subgraph_handoffs[pred_sg].1.push(hoff_id);
807 }
808 }
809
810 subgraph_handoffs
811 }
812
813 pub fn as_code(
829 &self,
830 root: &TokenStream,
831 include_type_guards: bool,
832 prefix: TokenStream,
833 diagnostics: &mut Diagnostics,
834 ) -> Result<TokenStream, Diagnostics> {
835 self.as_code_with_options(root, include_type_guards, true, prefix, diagnostics)
836 }
837
838 pub fn as_code_with_options(
847 &self,
848 root: &TokenStream,
849 include_type_guards: bool,
850 include_meta: bool,
851 prefix: TokenStream,
852 diagnostics: &mut Diagnostics,
853 ) -> Result<TokenStream, Diagnostics> {
854 fn slotmap_raw_idx(key: impl Key) -> usize {
859 (key.data().as_ffi() & 0xFFFF_FFFF) as usize
860 }
861
862 let df = Ident::new(GRAPH, Span::call_site());
863 let context = Ident::new(CONTEXT, Span::call_site());
864
865 let handoff_nodes: Vec<_> = self
867 .nodes
868 .iter()
869 .filter_map(|(node_id, node)| match node {
870 GraphNode::Operator(_) => None,
871 &GraphNode::Handoff { src_span, dst_span } => Some((node_id, (src_span, dst_span))),
872 GraphNode::ModuleBoundary { .. } => panic!(),
873 })
874 .collect();
875
876 let buffer_code: Vec<TokenStream> = handoff_nodes
877 .iter()
878 .map(|&(node_id, (src_span, dst_span))| {
879 let span = src_span.join(dst_span).unwrap_or(src_span);
880 let buf_ident = self.hoff_buf_ident(node_id, span);
881 quote_spanned! {span=>
882 let mut #buf_ident: Vec<_> = Vec::new();
883 }
884 })
885 .collect();
886
887 let back_buffer_code: Vec<TokenStream> = handoff_nodes
892 .iter()
893 .filter(|(node_id, _)| self.handoff_delay_type(*node_id).is_some())
894 .map(|&(node_id, (src_span, dst_span))| {
895 let span = src_span.join(dst_span).unwrap_or(src_span);
896 let back_ident = self.hoff_back_ident(node_id, span);
897 quote_spanned! {span=>
898 let mut #back_ident: Vec<_> = Vec::new();
899 }
900 })
901 .collect();
902
903 let subgraph_handoffs = self.helper_collect_subgraph_handoffs();
905
906 let mut defer_tick_buf_idents: Vec<Ident> = Vec::new();
917 let mut back_edge_hoff_ids: BTreeSet<GraphNodeId> = BTreeSet::new();
918 let all_subgraphs = {
919 let mut sg_preds = SecondaryMap::<_, Vec<_>>::with_capacity(self.subgraph_nodes.len());
921 for (hoff_id, node) in self.nodes() {
922 if !matches!(node, GraphNode::Handoff { .. }) {
923 continue;
925 }
926 assert_eq!(1, self.node_successors(hoff_id).len());
927 assert_eq!(1, self.node_predecessors(hoff_id).len());
928 let (_edge_id, pred) = self.node_predecessors(hoff_id).next().unwrap();
929 let (_edge_id, succ) = self.node_successors(hoff_id).next().unwrap();
930 let pred_sg = self.node_subgraph(pred).unwrap();
931 let succ_sg = self.node_subgraph(succ).unwrap();
932 if pred_sg == succ_sg {
933 panic!("bug: unexpected subgraph self-handoff cycle");
934 }
935 if let Some(delay_type) = self.handoff_delay_type(hoff_id) {
936 debug_assert!(matches!(delay_type, DelayType::Tick | DelayType::TickLazy));
937 back_edge_hoff_ids.insert(hoff_id);
940
941 if !matches!(delay_type, DelayType::TickLazy) {
943 defer_tick_buf_idents.push(self.hoff_buf_ident(hoff_id, node.span()));
944 }
945 } else {
946 sg_preds.entry(succ_sg).unwrap().or_default().push(pred_sg);
947 }
948 }
949
950 for dst_id in self.node_ids() {
953 for src_ref_id in self
954 .node_singleton_references(dst_id)
955 .iter()
956 .copied()
957 .flatten()
958 {
959 let src_sg = self
960 .node_subgraph(src_ref_id)
961 .expect("bug: singleton ref node must belong to a subgraph");
962 let dst_sg = self
963 .node_subgraph(dst_id)
964 .expect("bug: singleton ref consumer must belong to a subgraph");
965 if src_sg != dst_sg {
966 sg_preds.entry(dst_sg).unwrap().or_default().push(src_sg);
967 }
968 }
969 }
970
971 let topo_sort = super::graph_algorithms::topo_sort(self.subgraph_ids(), |sg_id| {
972 sg_preds.get(sg_id).into_iter().flatten().copied()
973 })
974 .expect("bug: unexpected cycle between subgraphs within the tick");
975
976 topo_sort
977 .into_iter()
978 .map(|sg_id| (sg_id, self.subgraph(sg_id)))
979 .collect::<Vec<_>>()
980 };
981
982 let back_edge_swap_code: Vec<TokenStream> = back_edge_hoff_ids
986 .iter()
987 .map(|&hoff_id| {
988 let span = self.nodes[hoff_id].span();
989 let buf_ident = self.hoff_buf_ident(hoff_id, span);
990 let back_ident = self.hoff_back_ident(hoff_id, span);
991 quote_spanned! {span=>
992 ::std::mem::swap(&mut #buf_ident, &mut #back_ident);
993 }
994 })
995 .collect();
996
997 let mut op_prologue_code = Vec::new();
998 let mut op_prologue_after_code = Vec::new();
999 let mut op_tick_end_code = Vec::new();
1000 let mut subgraph_blocks = Vec::new();
1001 {
1002 for &(subgraph_id, subgraph_nodes) in all_subgraphs.iter() {
1003 let sg_metrics_idx = slotmap_raw_idx(subgraph_id);
1004 let (recv_hoffs, send_hoffs) = &subgraph_handoffs[subgraph_id];
1005
1006 let recv_port_idents: Vec<Ident> = recv_hoffs
1008 .iter()
1009 .map(|&hoff_id| self.node_as_ident(hoff_id, true))
1010 .collect();
1011 let send_port_idents: Vec<Ident> = send_hoffs
1012 .iter()
1013 .map(|&hoff_id| self.node_as_ident(hoff_id, false))
1014 .collect();
1015
1016 let recv_buf_idents: Vec<Ident> = recv_hoffs
1018 .iter()
1019 .map(|&hoff_id| self.hoff_buf_ident(hoff_id, self.nodes[hoff_id].span()))
1020 .collect();
1021 let send_buf_idents: Vec<Ident> = send_hoffs
1022 .iter()
1023 .map(|&hoff_id| self.hoff_buf_ident(hoff_id, self.nodes[hoff_id].span()))
1024 .collect();
1025
1026 let recv_port_code: Vec<TokenStream> = recv_port_idents
1030 .iter()
1031 .zip(recv_buf_idents.iter())
1032 .zip(recv_hoffs.iter())
1033 .map(|((port_ident, buf_ident), &hoff_id)| {
1034 let hoff_idx = slotmap_raw_idx(hoff_id);
1035 let work_done = Ident::new("__dfir_work_done", Span::call_site());
1039 let metrics = Ident::new("__dfir_metrics", Span::call_site());
1040 let drain_ident = if back_edge_hoff_ids.contains(&hoff_id) {
1043 self.hoff_back_ident(hoff_id, buf_ident.span())
1044 } else {
1045 buf_ident.clone()
1046 };
1047 quote_spanned! {port_ident.span()=>
1048 {
1049 let hoff_len = #drain_ident.len();
1050 if hoff_len > 0 {
1051 #work_done = true;
1052 }
1053 let hoff_metrics = &#metrics.handoffs[
1054 #root::util::slot_vec::Key::<#root::scheduled::HandoffTag>::from_raw(#hoff_idx)
1055 ];
1056 hoff_metrics.total_items_count.update(|x| x + hoff_len);
1057 hoff_metrics.curr_items_count.set(hoff_len);
1058 }
1059 let #port_ident = #root::dfir_pipes::pull::iter(#drain_ident.drain(..));
1060 }
1061 })
1062 .collect();
1063
1064 let send_port_code: Vec<TokenStream> = send_port_idents
1066 .iter()
1067 .zip(send_buf_idents.iter())
1068 .map(|(port_ident, buf_ident)| {
1069 quote_spanned! {port_ident.span()=>
1070 let #port_ident = #root::dfir_pipes::push::vec_push(&mut #buf_ident);
1071 }
1072 })
1073 .collect();
1074
1075 let loop_id = self.node_loop(subgraph_nodes[0]);
1077
1078 let mut subgraph_op_iter_code = Vec::new();
1079 let mut subgraph_op_iter_after_code = Vec::new();
1080 {
1081 let pull_to_push_idx = self.find_pull_to_push_idx(subgraph_nodes);
1082
1083 let (pull_half, push_half) = subgraph_nodes.split_at(pull_to_push_idx);
1084 let nodes_iter = pull_half.iter().chain(push_half.iter().rev());
1085
1086 for (idx, &node_id) in nodes_iter.enumerate() {
1087 let node = &self.nodes[node_id];
1088 assert!(
1089 matches!(node, GraphNode::Operator(_)),
1090 "Handoffs are not part of subgraphs."
1091 );
1092 let op_inst = &self.operator_instances[node_id];
1093
1094 let op_span = node.span();
1095 let op_name = op_inst.op_constraints.name;
1096 let root = change_spans(root.clone(), op_span);
1098 let op_constraints = OPERATORS
1099 .iter()
1100 .find(|op| op_name == op.name)
1101 .unwrap_or_else(|| panic!("Failed to find op: {}", op_name));
1102
1103 let ident = self.node_as_ident(node_id, false);
1104
1105 {
1106 let mut input_edges = self
1109 .graph
1110 .predecessor_edges(node_id)
1111 .map(|edge_id| (self.edge_ports(edge_id).1, edge_id))
1112 .collect::<Vec<_>>();
1113 input_edges.sort();
1115
1116 let inputs = input_edges
1117 .iter()
1118 .map(|&(_port, edge_id)| {
1119 let (pred, _) = self.edge(edge_id);
1120 self.node_as_ident(pred, true)
1121 })
1122 .collect::<Vec<_>>();
1123
1124 let mut output_edges = self
1126 .graph
1127 .successor_edges(node_id)
1128 .map(|edge_id| (&self.ports[edge_id].0, edge_id))
1129 .collect::<Vec<_>>();
1130 output_edges.sort();
1132
1133 let outputs = output_edges
1134 .iter()
1135 .map(|&(_port, edge_id)| {
1136 let (_, succ) = self.edge(edge_id);
1137 self.node_as_ident(succ, false)
1138 })
1139 .collect::<Vec<_>>();
1140
1141 let is_pull = idx < pull_to_push_idx;
1142
1143 let singleton_output_ident = &if op_constraints.has_singleton_output {
1144 self.node_as_singleton_ident(node_id, op_span)
1145 } else {
1146 Ident::new(&format!("{}_has_no_singleton_output", op_name), op_span)
1148 };
1149
1150 let df_local = &Ident::new(GRAPH, op_span.resolved_at(df.span()));
1159 let context = &Ident::new(CONTEXT, op_span.resolved_at(context.span()));
1160
1161 let singletons_resolved =
1162 self.helper_resolve_singletons(node_id, op_span);
1163 let arguments = &process_singletons::postprocess_singletons(
1164 op_inst.arguments_raw.clone(),
1165 singletons_resolved.clone(),
1166 context,
1167 );
1168 let arguments_handles =
1169 &process_singletons::postprocess_singletons_handles(
1170 op_inst.arguments_raw.clone(),
1171 singletons_resolved.clone(),
1172 );
1173
1174 let source_tag = 'a: {
1175 if let Some(tag) = self.operator_tag.get(node_id).cloned() {
1176 break 'a tag;
1177 }
1178
1179 #[cfg(nightly)]
1180 if proc_macro::is_available() {
1181 let op_span = op_span.unwrap();
1182 break 'a format!(
1183 "loc_{}_{}_{}_{}_{}",
1184 crate::pretty_span::make_source_path_relative(
1185 &op_span.file()
1186 )
1187 .display()
1188 .to_string()
1189 .replace(|x: char| !x.is_ascii_alphanumeric(), "_"),
1190 op_span.start().line(),
1191 op_span.start().column(),
1192 op_span.end().line(),
1193 op_span.end().column(),
1194 );
1195 }
1196
1197 format!(
1198 "loc_nopath_{}_{}_{}_{}",
1199 op_span.start().line,
1200 op_span.start().column,
1201 op_span.end().line,
1202 op_span.end().column
1203 )
1204 };
1205
1206 let work_fn = format_ident!(
1207 "{}__{}__{}",
1208 ident,
1209 op_name,
1210 source_tag,
1211 span = op_span
1212 );
1213 let work_fn_async = format_ident!("{}__async", work_fn, span = op_span);
1214
1215 let context_args = WriteContextArgs {
1216 root: &root,
1217 df_ident: df_local,
1218 context,
1219 subgraph_id,
1220 node_id,
1221 loop_id,
1222 op_span,
1223 op_tag: self.operator_tag.get(node_id).cloned(),
1224 work_fn: &work_fn,
1225 work_fn_async: &work_fn_async,
1226 ident: &ident,
1227 is_pull,
1228 inputs: &inputs,
1229 outputs: &outputs,
1230 singleton_output_ident,
1231 op_name,
1232 op_inst,
1233 arguments,
1234 arguments_handles,
1235 };
1236
1237 let write_result =
1238 (op_constraints.write_fn)(&context_args, diagnostics);
1239 let OperatorWriteOutput {
1240 write_prologue,
1241 write_prologue_after,
1242 write_iterator,
1243 write_iterator_after,
1244 write_tick_end,
1245 } = write_result.unwrap_or_else(|()| {
1246 assert!(
1247 diagnostics.has_error(),
1248 "Operator `{}` returned `Err` but emitted no diagnostics, this is a bug.",
1249 op_name,
1250 );
1251 OperatorWriteOutput {
1252 write_iterator: null_write_iterator_fn(&context_args),
1253 ..Default::default()
1254 }
1255 });
1256
1257 op_prologue_code.push(syn::parse_quote! {
1258 #[allow(non_snake_case)]
1259 #[inline(always)]
1260 fn #work_fn<T>(thunk: impl ::std::ops::FnOnce() -> T) -> T {
1261 thunk()
1262 }
1263
1264 #[allow(non_snake_case)]
1265 #[inline(always)]
1266 async fn #work_fn_async<T>(
1267 thunk: impl ::std::future::Future<Output = T>,
1268 ) -> T {
1269 thunk.await
1270 }
1271 });
1272 op_prologue_code.push(write_prologue);
1273 op_prologue_after_code.push(write_prologue_after);
1274 op_tick_end_code.push(write_tick_end);
1275 subgraph_op_iter_code.push(write_iterator);
1276
1277 if include_type_guards {
1278 let type_guard = if is_pull {
1279 quote_spanned! {op_span=>
1280 let #ident = {
1281 #[allow(non_snake_case)]
1282 #[inline(always)]
1283 pub fn #work_fn<Item, Input>(input: Input)
1284 -> impl #root::dfir_pipes::pull::Pull<Item = Item, Meta = (), CanPend = Input::CanPend, CanEnd = Input::CanEnd>
1285 where
1286 Input: #root::dfir_pipes::pull::Pull<Item = Item, Meta = ()>,
1287 {
1288 #root::pin_project_lite::pin_project! {
1289 #[repr(transparent)]
1290 struct Pull<Item, Input: #root::dfir_pipes::pull::Pull<Item = Item>> {
1291 #[pin]
1292 inner: Input
1293 }
1294 }
1295
1296 impl<Item, Input> #root::dfir_pipes::pull::Pull for Pull<Item, Input>
1297 where
1298 Input: #root::dfir_pipes::pull::Pull<Item = Item>,
1299 {
1300 type Ctx<'ctx> = Input::Ctx<'ctx>;
1301
1302 type Item = Item;
1303 type Meta = Input::Meta;
1304 type CanPend = Input::CanPend;
1305 type CanEnd = Input::CanEnd;
1306
1307 #[inline(always)]
1308 fn pull(
1309 self: ::std::pin::Pin<&mut Self>,
1310 ctx: &mut Self::Ctx<'_>,
1311 ) -> #root::dfir_pipes::pull::PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
1312 #root::dfir_pipes::pull::Pull::pull(self.project().inner, ctx)
1313 }
1314
1315 #[inline(always)]
1316 fn size_hint(&self) -> (usize, Option<usize>) {
1317 #root::dfir_pipes::pull::Pull::size_hint(&self.inner)
1318 }
1319 }
1320
1321 Pull {
1322 inner: input
1323 }
1324 }
1325 #work_fn::<_, _>( #ident )
1326 };
1327 }
1328 } else {
1329 quote_spanned! {op_span=>
1330 let #ident = {
1331 #[allow(non_snake_case)]
1332 #[inline(always)]
1333 pub fn #work_fn<Item, Psh>(psh: Psh) -> impl #root::dfir_pipes::push::Push<Item, (), CanPend = Psh::CanPend>
1334 where
1335 Psh: #root::dfir_pipes::push::Push<Item, ()>
1336 {
1337 #root::pin_project_lite::pin_project! {
1338 #[repr(transparent)]
1339 struct PushGuard<Psh> {
1340 #[pin]
1341 inner: Psh,
1342 }
1343 }
1344
1345 impl<Item, Psh> #root::dfir_pipes::push::Push<Item, ()> for PushGuard<Psh>
1346 where
1347 Psh: #root::dfir_pipes::push::Push<Item, ()>,
1348 {
1349 type Ctx<'ctx> = Psh::Ctx<'ctx>;
1350
1351 type CanPend = Psh::CanPend;
1352
1353 #[inline(always)]
1354 fn poll_ready(
1355 self: ::std::pin::Pin<&mut Self>,
1356 ctx: &mut Self::Ctx<'_>,
1357 ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1358 #root::dfir_pipes::push::Push::poll_ready(self.project().inner, ctx)
1359 }
1360
1361 #[inline(always)]
1362 fn start_send(
1363 self: ::std::pin::Pin<&mut Self>,
1364 item: Item,
1365 meta: (),
1366 ) {
1367 #root::dfir_pipes::push::Push::start_send(self.project().inner, item, meta)
1368 }
1369
1370 #[inline(always)]
1371 fn poll_flush(
1372 self: ::std::pin::Pin<&mut Self>,
1373 ctx: &mut Self::Ctx<'_>,
1374 ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1375 #root::dfir_pipes::push::Push::poll_flush(self.project().inner, ctx)
1376 }
1377
1378 #[inline(always)]
1379 fn size_hint(
1380 self: ::std::pin::Pin<&mut Self>,
1381 hint: (usize, Option<usize>),
1382 ) {
1383 #root::dfir_pipes::push::Push::size_hint(self.project().inner, hint)
1384 }
1385 }
1386
1387 PushGuard {
1388 inner: psh
1389 }
1390 }
1391 #work_fn( #ident )
1392 };
1393 }
1394 };
1395 subgraph_op_iter_code.push(type_guard);
1396 }
1397 subgraph_op_iter_after_code.push(write_iterator_after);
1398 }
1399 }
1400
1401 {
1402 let pull_ident = if 0 < pull_to_push_idx {
1404 self.node_as_ident(subgraph_nodes[pull_to_push_idx - 1], false)
1405 } else {
1406 recv_port_idents[0].clone()
1408 };
1409
1410 #[rustfmt::skip]
1411 let push_ident = if let Some(&node_id) =
1412 subgraph_nodes.get(pull_to_push_idx)
1413 {
1414 self.node_as_ident(node_id, false)
1415 } else if 1 == send_port_idents.len() {
1416 send_port_idents[0].clone()
1418 } else {
1419 diagnostics.push(Diagnostic::spanned(
1420 pull_ident.span(),
1421 Level::Error,
1422 "Degenerate subgraph detected, is there a disconnected `null()` or other degenerate pipeline somewhere?",
1423 ));
1424 continue;
1425 };
1426
1427 let pivot_span = pull_ident
1429 .span()
1430 .join(push_ident.span())
1431 .unwrap_or_else(|| push_ident.span());
1432 let pivot_fn_ident =
1433 Ident::new(&format!("pivot_run_sg_{:?}", subgraph_id.0), pivot_span);
1434 let root = change_spans(root.clone(), pivot_span);
1435 subgraph_op_iter_code.push(quote_spanned! {pivot_span=>
1436 #[inline(always)]
1437 fn #pivot_fn_ident<Pul, Psh, Item>(pull: Pul, push: Psh)
1438 -> impl ::std::future::Future<Output = ()>
1439 where
1440 Pul: #root::dfir_pipes::pull::Pull<Item = Item>,
1441 Psh: #root::dfir_pipes::push::Push<Item, Pul::Meta>,
1442 {
1443 #root::dfir_pipes::pull::Pull::send_push(pull, push)
1444 }
1445 (#pivot_fn_ident)(#pull_ident, #push_ident).await;
1446 });
1447 }
1448 };
1449
1450 let sg_fut_ident = subgraph_id.as_ident(Span::call_site());
1454
1455 let send_metrics_code: Vec<TokenStream> = send_hoffs
1457 .iter()
1458 .zip(send_buf_idents.iter())
1459 .map(|(&hoff_id, buf_ident)| {
1460 let hoff_idx = slotmap_raw_idx(hoff_id);
1461 quote! {
1462 __dfir_metrics.handoffs[
1463 #root::util::slot_vec::Key::<#root::scheduled::HandoffTag>::from_raw(#hoff_idx)
1464 ].curr_items_count.set(#buf_ident.len());
1465 }
1466 })
1467 .collect();
1468
1469 subgraph_blocks.push(quote! {
1470 let #sg_fut_ident = async {
1471 let #context = &#df;
1472 #( #recv_port_code )*
1473 #( #send_port_code )*
1474 #( #subgraph_op_iter_code )*
1475 #( #subgraph_op_iter_after_code )*
1476 };
1477 {
1478 let sg_metrics = &__dfir_metrics.subgraphs[
1479 #root::util::slot_vec::Key::<#root::scheduled::SubgraphTag>::from_raw(#sg_metrics_idx)
1480 ];
1481 #root::scheduled::metrics::InstrumentSubgraph::new(
1482 #sg_fut_ident, sg_metrics
1483 ).await;
1484 sg_metrics.total_run_count.update(|x| x + 1);
1485 }
1486 #( #send_metrics_code )*
1487 });
1488
1489 }
1492 }
1493
1494 if diagnostics.has_error() {
1495 return Err(std::mem::take(diagnostics));
1496 }
1497 let _ = diagnostics; let (meta_graph_arg, diagnostics_arg) = if include_meta {
1500 let meta_graph_json = serde_json::to_string(&self).unwrap();
1501 let meta_graph_json = Literal::string(&meta_graph_json);
1502
1503 let serde_diagnostics: Vec<_> = diagnostics.iter().map(Diagnostic::to_serde).collect();
1504 let diagnostics_json = serde_json::to_string(&*serde_diagnostics).unwrap();
1505 let diagnostics_json = Literal::string(&diagnostics_json);
1506
1507 (
1508 quote! { Some(#meta_graph_json) },
1509 quote! { Some(#diagnostics_json) },
1510 )
1511 } else {
1512 (quote! { None }, quote! { None })
1513 };
1514
1515 let metrics_init_code = {
1517 let handoff_inits = handoff_nodes.iter().map(|&(node_id, _)| {
1518 let idx = slotmap_raw_idx(node_id);
1519 quote! {
1520 dfir_metrics.handoffs.insert(
1521 #root::util::slot_vec::Key::from_raw(#idx),
1522 ::std::default::Default::default(),
1523 );
1524 }
1525 });
1526 let subgraph_inits = all_subgraphs.iter().map(|&(sg_id, _)| {
1527 let idx = slotmap_raw_idx(sg_id);
1528 quote! {
1529 dfir_metrics.subgraphs.insert(
1530 #root::util::slot_vec::Key::from_raw(#idx),
1531 ::std::default::Default::default(),
1532 );
1533 }
1534 });
1535 handoff_inits.chain(subgraph_inits).collect::<Vec<_>>()
1536 };
1537
1538 Ok(quote! {
1541 {
1542 #prefix
1543
1544 use #root::{var_expr, var_args};
1545
1546 let __dfir_wake_state = ::std::sync::Arc::new(
1547 #root::scheduled::context::WakeState::default()
1548 );
1549
1550 let __dfir_metrics = {
1551 let mut dfir_metrics = #root::scheduled::metrics::DfirMetrics::default();
1552 #( #metrics_init_code )*
1553 ::std::rc::Rc::new(dfir_metrics)
1554 };
1555
1556 #[allow(unused_mut)]
1557 let mut #df = #root::scheduled::context::Context::new(
1558 ::std::clone::Clone::clone(&__dfir_wake_state),
1559 __dfir_metrics,
1560 );
1561
1562 #( #buffer_code )*
1563 #( #back_buffer_code )*
1564 #( #op_prologue_code )*
1565 #( #op_prologue_after_code )*
1566
1567 let mut __dfir_work_done = true;
1572 #[allow(unused_qualifications, unused_mut, unused_variables, clippy::await_holding_refcell_ref)]
1573 let __dfir_inline_tick = async move |#df: &mut #root::scheduled::context::Context| {
1574 let __dfir_metrics = #df.metrics();
1575 #( #back_edge_swap_code )*
1578 #( #subgraph_blocks )*
1579
1580 if false #( || !#defer_tick_buf_idents.is_empty() )* {
1586 #df.schedule_subgraph(
1587 #root::scheduled::SubgraphId::from_raw(0),
1588 true,
1589 );
1590 }
1591
1592 #( #op_tick_end_code )*
1594
1595 #df.__end_tick();
1596 ::std::mem::take(&mut __dfir_work_done)
1597 };
1598 #root::scheduled::context::Dfir::new(
1599 __dfir_inline_tick,
1600 #df,
1601 #meta_graph_arg,
1602 #diagnostics_arg,
1603 )
1604 }
1605 })
1606 }
1607
1608 pub fn node_color_map(&self) -> SparseSecondaryMap<GraphNodeId, Color> {
1611 let mut node_color_map: SparseSecondaryMap<GraphNodeId, Color> = self
1612 .node_ids()
1613 .filter_map(|node_id| {
1614 let op_color = self.node_color(node_id)?;
1615 Some((node_id, op_color))
1616 })
1617 .collect();
1618
1619 for sg_nodes in self.subgraph_nodes.values() {
1621 let pull_to_push_idx = self.find_pull_to_push_idx(sg_nodes);
1622
1623 for (idx, node_id) in sg_nodes.iter().copied().enumerate() {
1624 let is_pull = idx < pull_to_push_idx;
1625 node_color_map.insert(node_id, if is_pull { Color::Pull } else { Color::Push });
1626 }
1627 }
1628
1629 node_color_map
1630 }
1631
1632 pub fn to_mermaid(&self, write_config: &WriteConfig) -> String {
1634 let mut output = String::new();
1635 self.write_mermaid(&mut output, write_config).unwrap();
1636 output
1637 }
1638
1639 pub fn write_mermaid(
1641 &self,
1642 output: impl std::fmt::Write,
1643 write_config: &WriteConfig,
1644 ) -> std::fmt::Result {
1645 let mut graph_write = Mermaid::new(output);
1646 self.write_graph(&mut graph_write, write_config)
1647 }
1648
1649 pub fn to_dot(&self, write_config: &WriteConfig) -> String {
1651 let mut output = String::new();
1652 let mut graph_write = Dot::new(&mut output);
1653 self.write_graph(&mut graph_write, write_config).unwrap();
1654 output
1655 }
1656
1657 pub fn write_dot(
1659 &self,
1660 output: impl std::fmt::Write,
1661 write_config: &WriteConfig,
1662 ) -> std::fmt::Result {
1663 let mut graph_write = Dot::new(output);
1664 self.write_graph(&mut graph_write, write_config)
1665 }
1666
1667 pub(crate) fn write_graph<W>(
1669 &self,
1670 mut graph_write: W,
1671 write_config: &WriteConfig,
1672 ) -> Result<(), W::Err>
1673 where
1674 W: GraphWrite,
1675 {
1676 fn helper_edge_label(
1677 src_port: &PortIndexValue,
1678 dst_port: &PortIndexValue,
1679 ) -> Option<String> {
1680 let src_label = match src_port {
1681 PortIndexValue::Path(path) => Some(path.to_token_stream().to_string()),
1682 PortIndexValue::Int(index) => Some(index.value.to_string()),
1683 _ => None,
1684 };
1685 let dst_label = match dst_port {
1686 PortIndexValue::Path(path) => Some(path.to_token_stream().to_string()),
1687 PortIndexValue::Int(index) => Some(index.value.to_string()),
1688 _ => None,
1689 };
1690 let label = match (src_label, dst_label) {
1691 (Some(l1), Some(l2)) => Some(format!("{}\n{}", l1, l2)),
1692 (Some(l1), None) => Some(l1),
1693 (None, Some(l2)) => Some(l2),
1694 (None, None) => None,
1695 };
1696 label
1697 }
1698
1699 let node_color_map = self.node_color_map();
1701
1702 graph_write.write_prologue()?;
1704
1705 let mut skipped_handoffs = BTreeSet::new();
1707 let mut subgraph_handoffs = <BTreeMap<GraphSubgraphId, Vec<GraphNodeId>>>::new();
1708 for (node_id, node) in self.nodes() {
1709 if matches!(node, GraphNode::Handoff { .. }) {
1710 if write_config.no_handoffs {
1711 skipped_handoffs.insert(node_id);
1712 continue;
1713 } else {
1714 let pred_node = self.node_predecessor_nodes(node_id).next().unwrap();
1715 let pred_sg = self.node_subgraph(pred_node);
1716 let succ_node = self.node_successor_nodes(node_id).next().unwrap();
1717 let succ_sg = self.node_subgraph(succ_node);
1718 if let Some((pred_sg, succ_sg)) = pred_sg.zip(succ_sg)
1719 && pred_sg == succ_sg
1720 {
1721 subgraph_handoffs.entry(pred_sg).or_default().push(node_id);
1722 }
1723 }
1724 }
1725 graph_write.write_node_definition(
1726 node_id,
1727 &if write_config.op_short_text {
1728 node.to_name_string()
1729 } else if write_config.op_text_no_imports {
1730 let full_text = node.to_pretty_string();
1732 let mut output = String::new();
1733 for sentence in full_text.split('\n') {
1734 if sentence.trim().starts_with("use") {
1735 continue;
1736 }
1737 output.push('\n');
1738 output.push_str(sentence);
1739 }
1740 output.into()
1741 } else {
1742 node.to_pretty_string()
1743 },
1744 if write_config.no_pull_push {
1745 None
1746 } else {
1747 node_color_map.get(node_id).copied()
1748 },
1749 )?;
1750 }
1751
1752 for (edge_id, (src_id, mut dst_id)) in self.edges() {
1754 if skipped_handoffs.contains(&src_id) {
1756 continue;
1757 }
1758
1759 let (src_port, mut dst_port) = self.edge_ports(edge_id);
1760 if skipped_handoffs.contains(&dst_id) {
1761 let mut handoff_succs = self.node_successors(dst_id);
1762 assert_eq!(1, handoff_succs.len());
1763 let (succ_edge, succ_node) = handoff_succs.next().unwrap();
1764 dst_id = succ_node;
1765 dst_port = self.edge_ports(succ_edge).1;
1766 }
1767
1768 let label = helper_edge_label(src_port, dst_port);
1769 let delay_type = self
1770 .node_op_inst(dst_id)
1771 .and_then(|op_inst| (op_inst.op_constraints.input_delaytype_fn)(dst_port));
1772 graph_write.write_edge(src_id, dst_id, delay_type, label.as_deref(), false)?;
1773 }
1774
1775 if !write_config.no_references {
1777 for dst_id in self.node_ids() {
1778 for src_ref_id in self
1779 .node_singleton_references(dst_id)
1780 .iter()
1781 .copied()
1782 .flatten()
1783 {
1784 let delay_type = Some(DelayType::Stratum);
1785 let label = None;
1786 graph_write.write_edge(src_ref_id, dst_id, delay_type, label, true)?;
1787 }
1788 }
1789 }
1790
1791 let loop_subgraphs = self.subgraph_ids().map(|sg_id| {
1799 let loop_id = if write_config.no_loops {
1800 None
1801 } else {
1802 self.subgraph_loop(sg_id)
1803 };
1804 (loop_id, sg_id)
1805 });
1806 let loop_subgraphs = into_group_map(loop_subgraphs);
1807 for (loop_id, subgraph_ids) in loop_subgraphs {
1808 if let Some(loop_id) = loop_id {
1809 graph_write.write_loop_start(loop_id)?;
1810 }
1811
1812 let subgraph_varnames_nodes = subgraph_ids.into_iter().flat_map(|sg_id| {
1814 self.subgraph(sg_id).iter().copied().map(move |node_id| {
1815 let opt_sg_id = if write_config.no_subgraphs {
1816 None
1817 } else {
1818 Some(sg_id)
1819 };
1820 (opt_sg_id, (self.node_varname(node_id), node_id))
1821 })
1822 });
1823 let subgraph_varnames_nodes = into_group_map(subgraph_varnames_nodes);
1824 for (sg_id, varnames) in subgraph_varnames_nodes {
1825 if let Some(sg_id) = sg_id {
1826 graph_write.write_subgraph_start(sg_id)?;
1827 }
1828
1829 let varname_nodes = varnames.into_iter().map(|(varname, node)| {
1831 let varname = if write_config.no_varnames {
1832 None
1833 } else {
1834 varname
1835 };
1836 (varname, node)
1837 });
1838 let varname_nodes = into_group_map(varname_nodes);
1839 for (varname, node_ids) in varname_nodes {
1840 if let Some(varname) = varname {
1841 graph_write.write_varname_start(&varname.0.to_string(), sg_id)?;
1842 }
1843
1844 for node_id in node_ids {
1846 graph_write.write_node(node_id)?;
1847 }
1848
1849 if varname.is_some() {
1850 graph_write.write_varname_end()?;
1851 }
1852 }
1853
1854 if sg_id.is_some() {
1855 graph_write.write_subgraph_end()?;
1856 }
1857 }
1858
1859 if loop_id.is_some() {
1860 graph_write.write_loop_end()?;
1861 }
1862 }
1863
1864 graph_write.write_epilogue()?;
1866
1867 Ok(())
1868 }
1869
1870 pub fn surface_syntax_string(&self) -> String {
1872 let mut string = String::new();
1873 self.write_surface_syntax(&mut string).unwrap();
1874 string
1875 }
1876
1877 pub fn write_surface_syntax(&self, write: &mut impl std::fmt::Write) -> std::fmt::Result {
1879 for (key, node) in self.nodes.iter() {
1880 match node {
1881 GraphNode::Operator(op) => {
1882 writeln!(write, "{:?} = {};", key.data(), op.to_token_stream())?;
1883 }
1884 GraphNode::Handoff { .. } => {
1885 writeln!(write, "// {:?} = <handoff>;", key.data())?;
1886 }
1887 GraphNode::ModuleBoundary { .. } => panic!(),
1888 }
1889 }
1890 writeln!(write)?;
1891 for (_e, (src_key, dst_key)) in self.graph.edges() {
1892 writeln!(write, "{:?} -> {:?};", src_key.data(), dst_key.data())?;
1893 }
1894 Ok(())
1895 }
1896
1897 pub fn mermaid_string_flat(&self) -> String {
1899 let mut string = String::new();
1900 self.write_mermaid_flat(&mut string).unwrap();
1901 string
1902 }
1903
1904 pub fn write_mermaid_flat(&self, write: &mut impl std::fmt::Write) -> std::fmt::Result {
1906 writeln!(write, "flowchart TB")?;
1907 for (key, node) in self.nodes.iter() {
1908 match node {
1909 GraphNode::Operator(operator) => writeln!(
1910 write,
1911 " %% {span}\n {id:?}[\"{row_col} <tt>{code}</tt>\"]",
1912 span = PrettySpan(node.span()),
1913 id = key.data(),
1914 row_col = PrettyRowCol(node.span()),
1915 code = operator
1916 .to_token_stream()
1917 .to_string()
1918 .replace('&', "&")
1919 .replace('<', "<")
1920 .replace('>', ">")
1921 .replace('"', """)
1922 .replace('\n', "<br>"),
1923 ),
1924 GraphNode::Handoff { .. } => {
1925 writeln!(write, r#" {:?}{{"{}"}}"#, key.data(), HANDOFF_NODE_STR)
1926 }
1927 GraphNode::ModuleBoundary { .. } => {
1928 writeln!(
1929 write,
1930 r#" {:?}{{"{}"}}"#,
1931 key.data(),
1932 MODULE_BOUNDARY_NODE_STR
1933 )
1934 }
1935 }?;
1936 }
1937 writeln!(write)?;
1938 for (_e, (src_key, dst_key)) in self.graph.edges() {
1939 writeln!(write, " {:?}-->{:?}", src_key.data(), dst_key.data())?;
1940 }
1941 Ok(())
1942 }
1943}
1944
1945impl DfirGraph {
1947 pub fn loop_ids(&self) -> slotmap::basic::Keys<'_, GraphLoopId, Vec<GraphNodeId>> {
1949 self.loop_nodes.keys()
1950 }
1951
1952 pub fn loops(&self) -> slotmap::basic::Iter<'_, GraphLoopId, Vec<GraphNodeId>> {
1954 self.loop_nodes.iter()
1955 }
1956
1957 pub fn insert_loop(&mut self, parent_loop: Option<GraphLoopId>) -> GraphLoopId {
1959 let loop_id = self.loop_nodes.insert(Vec::new());
1960 self.loop_children.insert(loop_id, Vec::new());
1961 if let Some(parent_loop) = parent_loop {
1962 self.loop_parent.insert(loop_id, parent_loop);
1963 self.loop_children
1964 .get_mut(parent_loop)
1965 .unwrap()
1966 .push(loop_id);
1967 } else {
1968 self.root_loops.push(loop_id);
1969 }
1970 loop_id
1971 }
1972
1973 pub fn node_loop(&self, node_id: GraphNodeId) -> Option<GraphLoopId> {
1975 self.node_loops.get(node_id).copied()
1976 }
1977
1978 pub fn subgraph_loop(&self, subgraph_id: GraphSubgraphId) -> Option<GraphLoopId> {
1980 let &node_id = self.subgraph(subgraph_id).first().unwrap();
1981 let out = self.node_loop(node_id);
1982 debug_assert!(
1983 self.subgraph(subgraph_id)
1984 .iter()
1985 .all(|&node_id| self.node_loop(node_id) == out),
1986 "Subgraph nodes should all have the same loop context."
1987 );
1988 out
1989 }
1990
1991 pub fn loop_parent(&self, loop_id: GraphLoopId) -> Option<GraphLoopId> {
1993 self.loop_parent.get(loop_id).copied()
1994 }
1995
1996 pub fn loop_children(&self, loop_id: GraphLoopId) -> &Vec<GraphLoopId> {
1998 self.loop_children.get(loop_id).unwrap()
1999 }
2000}
2001
2002#[derive(Clone, Debug, Default)]
2004#[cfg_attr(feature = "clap-derive", derive(clap::Args))]
2005pub struct WriteConfig {
2006 #[cfg_attr(feature = "clap-derive", arg(long))]
2008 pub no_subgraphs: bool,
2009 #[cfg_attr(feature = "clap-derive", arg(long))]
2011 pub no_varnames: bool,
2012 #[cfg_attr(feature = "clap-derive", arg(long))]
2014 pub no_pull_push: bool,
2015 #[cfg_attr(feature = "clap-derive", arg(long))]
2017 pub no_handoffs: bool,
2018 #[cfg_attr(feature = "clap-derive", arg(long))]
2020 pub no_references: bool,
2021 #[cfg_attr(feature = "clap-derive", arg(long))]
2023 pub no_loops: bool,
2024
2025 #[cfg_attr(feature = "clap-derive", arg(long))]
2027 pub op_short_text: bool,
2028 #[cfg_attr(feature = "clap-derive", arg(long))]
2030 pub op_text_no_imports: bool,
2031}
2032
2033#[derive(Copy, Clone, Debug)]
2035#[cfg_attr(feature = "clap-derive", derive(clap::Parser, clap::ValueEnum))]
2036pub enum WriteGraphType {
2037 Mermaid,
2039 Dot,
2041}
2042
2043fn into_group_map<K, V>(iter: impl IntoIterator<Item = (K, V)>) -> BTreeMap<K, Vec<V>>
2045where
2046 K: Ord,
2047{
2048 let mut out: BTreeMap<_, Vec<_>> = BTreeMap::new();
2049 for (k, v) in iter {
2050 out.entry(k).or_default().push(v);
2051 }
2052 out
2053}