added an option to generate a basic control flow graph for graphviz

This commit is contained in:
igor
2026-03-02 13:40:51 +01:00
parent 13f33d7820
commit 7faafe9df0
2 changed files with 227 additions and 2 deletions

227
main.py
View File

@@ -3321,6 +3321,199 @@ class Assembler:
self.enable_static_list_folding = enable_static_list_folding
self.enable_peephole_optimization = enable_peephole_optimization
self.loop_unroll_threshold = loop_unroll_threshold
self._last_cfg_definitions: List[Definition] = []
def _copy_definition_for_cfg(self, definition: Definition) -> Definition:
return Definition(
name=definition.name,
body=[Op(op=node.op, data=node.data, loc=node.loc) for node in definition.body],
immediate=definition.immediate,
compile_only=definition.compile_only,
terminator=definition.terminator,
inline=definition.inline,
)
def _format_cfg_op(self, node: Op) -> str:
kind = node._opcode
data = node.data
if kind == OP_LITERAL:
return f"literal {data!r}"
if kind == OP_WORD:
return f"word {data}"
if kind == OP_WORD_PTR:
return f"word_ptr {data}"
if kind == OP_BRANCH_ZERO:
return f"branch_zero {data}"
if kind == OP_JUMP:
return f"jump {data}"
if kind == OP_LABEL:
return f"label {data}"
if kind == OP_FOR_BEGIN:
return f"for_begin loop={data['loop']} end={data['end']}"
if kind == OP_FOR_END:
return f"for_end loop={data['loop']} end={data['end']}"
if kind == OP_LIST_BEGIN:
return f"list_begin {data}"
if kind == OP_LIST_END:
return f"list_end {data}"
if kind == OP_LIST_LITERAL:
return f"list_literal {data}"
return f"{node.op} {data!r}"
@staticmethod
def _dot_escape(text: str) -> str:
return text.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
@staticmethod
def _dot_id(text: str) -> str:
return re.sub(r"[^A-Za-z0-9_]", "_", text)
def _definition_cfg_blocks_and_edges(self, definition: Definition) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int, str]]]:
nodes = definition.body
if not nodes:
return [], []
label_positions = self._cfg_label_positions(nodes)
for_pairs = self._for_pairs(nodes)
leaders: Set[int] = {0}
def add_leader(idx: int) -> None:
if 0 <= idx < len(nodes):
leaders.add(idx)
for idx, node in enumerate(nodes):
kind = node._opcode
if kind == OP_LABEL:
leaders.add(idx)
elif kind == OP_BRANCH_ZERO:
target = label_positions.get(str(node.data))
if target is not None:
add_leader(target)
add_leader(idx + 1)
elif kind == OP_JUMP:
target = label_positions.get(str(node.data))
if target is not None:
add_leader(target)
add_leader(idx + 1)
elif kind == OP_FOR_BEGIN:
end_idx = for_pairs.get(idx)
if end_idx is not None:
add_leader(end_idx + 1)
add_leader(idx + 1)
elif kind == OP_FOR_END:
begin_idx = for_pairs.get(idx)
if begin_idx is not None:
add_leader(begin_idx + 1)
add_leader(idx + 1)
ordered = sorted(leaders)
blocks: List[Tuple[int, int]] = []
for i, start in enumerate(ordered):
end = ordered[i + 1] if i + 1 < len(ordered) else len(nodes)
if start < end:
blocks.append((start, end))
block_by_ip: Dict[int, int] = {}
for block_idx, (start, end) in enumerate(blocks):
for ip in range(start, end):
block_by_ip[ip] = block_idx
edges: List[Tuple[int, int, str]] = []
def add_edge(src_block: int, target_ip: int, label: str) -> None:
if target_ip < 0 or target_ip >= len(nodes):
return
dst_block = block_by_ip.get(target_ip)
if dst_block is None:
return
edges.append((src_block, dst_block, label))
for block_idx, (_start, end) in enumerate(blocks):
last_ip = end - 1
node = nodes[last_ip]
kind = node._opcode
if kind == OP_BRANCH_ZERO:
target = label_positions.get(str(node.data))
if target is not None:
add_edge(block_idx, target, "zero")
add_edge(block_idx, last_ip + 1, "nonzero")
continue
if kind == OP_JUMP:
target = label_positions.get(str(node.data))
if target is not None:
add_edge(block_idx, target, "jmp")
continue
if kind == OP_FOR_BEGIN:
end_idx = for_pairs.get(last_ip)
add_edge(block_idx, last_ip + 1, "enter")
if end_idx is not None:
add_edge(block_idx, end_idx + 1, "empty")
continue
if kind == OP_FOR_END:
begin_idx = for_pairs.get(last_ip)
if begin_idx is not None:
add_edge(block_idx, begin_idx + 1, "loop")
add_edge(block_idx, last_ip + 1, "exit")
continue
add_edge(block_idx, last_ip + 1, "next")
edges.sort(key=lambda item: (item[0], item[1], item[2]))
return blocks, edges
def _cfg_label_positions(self, nodes: Sequence[Op]) -> Dict[str, int]:
positions: Dict[str, int] = {}
for idx, node in enumerate(nodes):
if node._opcode == OP_LABEL:
positions[str(node.data)] = idx
return positions
def render_last_cfg_dot(self) -> str:
lines: List[str] = [
"digraph l2_cfg {",
" rankdir=LR;",
" node [shape=box, fontname=\"Courier\"];",
" edge [fontname=\"Courier\"];",
]
if not self._last_cfg_definitions:
lines.append(" empty [label=\"No runtime high-level definitions available for CFG dump\"];")
lines.append("}")
return "\n".join(lines)
for defn in self._last_cfg_definitions:
cluster_id = self._dot_id(f"cluster_{defn.name}")
lines.append(f" subgraph {cluster_id} {{")
lines.append(f" label=\"{self._dot_escape(defn.name)}\";")
blocks, edges = self._definition_cfg_blocks_and_edges(defn)
if not blocks:
node_id = self._dot_id(f"{defn.name}_empty")
lines.append(f" {node_id} [label=\"(empty)\"];")
lines.append(" }")
continue
prefix = self._dot_id(defn.name)
for block_idx, (start, end) in enumerate(blocks):
node_id = f"{prefix}_b{block_idx}"
op_lines = [f"{ip}: {self._format_cfg_op(defn.body[ip])}" for ip in range(start, end)]
label = "\\l".join(self._dot_escape(line) for line in op_lines) + "\\l"
lines.append(f" {node_id} [label=\"{label}\"];")
for src, dst, edge_label in edges:
src_id = f"{prefix}_b{src}"
dst_id = f"{prefix}_b{dst}"
lines.append(f" {src_id} -> {dst_id} [label=\"{self._dot_escape(edge_label)}\"];")
lines.append(" }")
lines.append("}")
return "\n".join(lines)
def _peephole_optimize_definition(self, definition: Definition) -> None:
# Rewrite short stack-manipulation sequences into canonical forms.
@@ -3735,6 +3928,7 @@ class Assembler:
is_program = entry_mode == "program"
emission = Emission()
self._export_all_defs = not is_program
self._last_cfg_definitions = []
try:
self._emit_externs(emission.text)
# Determine whether user provided a top-level `:asm _start` in
@@ -3840,6 +4034,12 @@ class Assembler:
# Inline-only definitions are expanded at call sites; skip emitting standalone labels.
runtime_defs = [defn for defn in runtime_defs if not getattr(defn, "inline", False)]
self._last_cfg_definitions = [
self._copy_definition_for_cfg(defn)
for defn in runtime_defs
if isinstance(defn, Definition)
]
for definition in runtime_defs:
self._emit_definition(definition, emission.text, debug=debug)
@@ -7661,6 +7861,14 @@ def cli(argv: Sequence[str]) -> int:
action="store_true",
help="shortcut for --no-artifact --ct-run-main",
)
parser.add_argument(
"--dump-cfg",
nargs="?",
default=None,
const="__AUTO__",
metavar="PATH",
help="write Graphviz DOT control-flow dump (default: <temp-dir>/<source>.cfg.dot)",
)
# Parse known and unknown args to allow -l flags anywhere
args, unknown = parser.parse_known_args(argv)
@@ -7684,6 +7892,7 @@ def cli(argv: Sequence[str]) -> int:
folding_enabled = not args.no_folding
static_list_folding_enabled = not args.no_static_list_folding
peephole_enabled = not args.no_peephole
cfg_output: Optional[Path] = None
if args.ct_run_main and artifact_kind != "exe":
parser.error("--ct-run-main requires --artifact exe")
@@ -7720,6 +7929,16 @@ def cli(argv: Sequence[str]) -> int:
if args.source is None and not args.repl:
parser.error("the following arguments are required: source")
if args.dump_cfg is not None:
if args.repl:
parser.error("--dump-cfg is not available with --repl")
if args.source is None:
parser.error("--dump-cfg requires a source file")
if args.dump_cfg == "__AUTO__":
cfg_output = args.temp_dir / f"{args.source.stem}.cfg.dot"
else:
cfg_output = Path(args.dump_cfg)
if not args.repl and args.output is None and not args.no_artifact:
stem = args.source.stem
default_outputs = {
@@ -7770,7 +7989,7 @@ def cli(argv: Sequence[str]) -> int:
# --- assembly-level cache check ---
asm_text: Optional[str] = None
fhash = ""
if cache and not args.ct_run_main:
if cache and not args.ct_run_main and args.dump_cfg is None:
fhash = cache.flags_hash(
args.debug,
folding_enabled,
@@ -7803,6 +8022,12 @@ def cli(argv: Sequence[str]) -> int:
has_ct = bool(compiler.parser.compile_time_vm._ct_executed)
cache.save(args.source, compiler._loaded_files, fhash, asm_text, has_ct_effects=has_ct)
if cfg_output is not None:
cfg_output.parent.mkdir(parents=True, exist_ok=True)
cfg_dot = compiler.assembler.render_last_cfg_dot()
cfg_output.write_text(cfg_dot)
print(f"[info] wrote {cfg_output}")
if args.ct_run_main:
try:
compiler.run_compile_time_word("main", libs=ct_run_libs)