improved the repl

This commit is contained in:
igor
2026-03-09 16:10:02 +01:00
parent 4f6fd946f6
commit 4e51e8803f

426
main.py
View File

@@ -1740,6 +1740,9 @@ class CompileTimeVM:
self.r13: int = 0 # return stack pointer (grows downward)
self._native_data_stack: Optional[Any] = None # ctypes buffer
self._native_data_top: int = 0
# REPL persistent state
self._repl_initialized: bool = False
self._repl_libs: List[str] = []
self._native_return_stack: Optional[Any] = None # ctypes buffer
self._native_return_top: int = 0
# JIT cache: word name → ctypes callable
@@ -1788,6 +1791,7 @@ class CompileTimeVM:
self.r12 = 0
self.r13 = 0
self.current_location = None
self._repl_initialized = False
def invoke(self, word: Word, *, runtime_mode: bool = False, libs: Optional[List[str]] = None) -> None:
self.reset()
@@ -1864,6 +1868,97 @@ class CompileTimeVM:
self.push(value)
self._call_word(word)
def invoke_repl(self, word: Word, *, libs: Optional[List[str]] = None) -> None:
"""Execute *word* in runtime mode, preserving stack/memory across calls.
On the first call (or after ``reset()``), allocates native stacks and
memory. Subsequent calls reuse the existing state so values left on
the data stack persist between REPL evaluations.
"""
self._ensure_jit_out()
prev_mode = self.runtime_mode
self.runtime_mode = True
if not self._repl_initialized:
persistent_size = 0
if self.parser.custom_bss:
for bss_line in self.parser.custom_bss:
m = _RE_BSS_PERSISTENT.search(bss_line)
if m:
persistent_size = int(m.group(1))
self.memory = CTMemory(persistent_size)
self.memory.setup_argv(sys.argv)
self._native_data_stack = ctypes.create_string_buffer(self.NATIVE_STACK_SIZE)
self._native_data_top = ctypes.addressof(self._native_data_stack) + self.NATIVE_STACK_SIZE
self.r12 = self._native_data_top
self._native_return_stack = ctypes.create_string_buffer(self.NATIVE_STACK_SIZE)
self._native_return_top = ctypes.addressof(self._native_return_stack) + self.NATIVE_STACK_SIZE
self.r13 = self._native_return_top
self._bss_symbols = {
"data_start": self.memory.data_start,
"data_end": self.memory.data_start + self.memory._data_offset if self.memory._data_offset else self.memory.data_end,
"print_buf": self.memory.print_buf_addr,
"print_buf_end": self.memory.print_buf_addr + CTMemory.PRINT_BUF_SIZE,
"persistent": self.memory.persistent_addr,
"persistent_end": self.memory.persistent_addr + self.memory._persistent_size,
"sys_argc": self.memory.sys_argc_addr,
"sys_argv": self.memory.sys_argv_addr,
}
self._jit_cache = {}
self._jit_code_pages = []
self._dl_handles = []
self._dl_func_cache = {}
all_libs = list(self._ct_libs)
if libs:
for lib in libs:
if lib not in all_libs:
all_libs.append(lib)
for lib_name in all_libs:
self._dlopen(lib_name)
old_limit = sys.getrecursionlimit()
if old_limit < 10000:
sys.setrecursionlimit(10000)
self._repl_initialized = True
self._repl_libs = list(libs or [])
else:
# Subsequent call — open any new libraries not yet loaded
if libs:
for lib in libs:
if lib not in self._repl_libs:
self._dlopen(lib)
self._repl_libs.append(lib)
# Clear transient state but keep stacks and memory
self.call_stack.clear()
self.loop_stack.clear()
self._list_capture_stack.clear()
self.current_location = None
# JIT cache must be cleared because word definitions change between
# REPL evaluations (re-parsed each time).
self._jit_cache.clear()
try:
self._call_word(word)
except _CTVMExit:
pass
finally:
self.runtime_mode = prev_mode
def repl_stack_values(self) -> List[int]:
"""Return current native data stack contents (bottom to top)."""
if not self._repl_initialized or self.r12 >= self._native_data_top:
return []
values = []
addr = self._native_data_top - 8
while addr >= self.r12:
values.append(CTMemory.read_qword(addr))
addr -= 8
return values
def push(self, value: Any) -> None:
if self.runtime_mode:
self.r12 -= 8
@@ -7429,6 +7524,13 @@ class Compiler:
return
self.parser.compile_time_vm.invoke(word, runtime_mode=True, libs=libs)
def run_compile_time_word_repl(self, name: str, *, libs: Optional[List[str]] = None) -> None:
"""Like run_compile_time_word but uses invoke_repl for persistent state."""
word = self.dictionary.lookup(name)
if word is None:
raise CompileTimeError(f"word '{name}' not defined; cannot run at compile time")
self.parser.compile_time_vm.invoke_repl(word, libs=libs)
def _resolve_import_target(self, importing_file: Path, target: str) -> Path:
raw = Path(target)
tried: List[Path] = []
@@ -7919,8 +8021,27 @@ def run_repl(
debug: bool = False,
initial_source: Optional[Path] = None,
) -> int:
"""REPL backed by the compile-time VM for instant execution."""
"""REPL backed by the compile-time VM for instant execution.
State (data stack, memory, definitions) persists across evaluations.
Use ``:reset`` to start fresh.
"""
# -- Colors ---------------------------------------------------------------
_C_RESET = "\033[0m"
_C_BOLD = "\033[1m"
_C_DIM = "\033[2m"
_C_GREEN = "\033[32m"
_C_CYAN = "\033[36m"
_C_YELLOW = "\033[33m"
_C_RED = "\033[31m"
_C_MAGENTA = "\033[35m"
use_color = sys.stdout.isatty()
def _c(code: str, text: str) -> str:
return f"{code}{text}{_C_RESET}" if use_color else text
# -- Helpers --------------------------------------------------------------
def _block_defines_main(block: str) -> bool:
stripped_lines = [ln.strip() for ln in block.splitlines() if ln.strip() and not ln.strip().startswith("#")]
for idx, stripped in enumerate(stripped_lines):
@@ -7954,13 +8075,15 @@ def run_repl(
has_user_main = has_user_main or _block_defines_main(initial_text)
if has_user_main:
main_body.clear()
print(f"[repl] loaded {initial_source}")
print(_c(_C_DIM, f"[repl] loaded {initial_source}"))
except Exception as exc:
print(f"[repl] failed to load {initial_source}: {exc}")
print(_c(_C_RED, f"[repl] failed to load {initial_source}: {exc}"))
# -- Persistent VM execution ----------------------------------------------
def _run_on_ct_vm(source: str, word_name: str = "main") -> bool:
"""Parse source and execute word_name via the compile-time VM.
"""Parse source and execute *word_name* via the compile-time VM.
Uses ``invoke_repl`` so stacks/memory persist across calls.
Returns True on success, False on error (already printed).
"""
nonlocal compiler
@@ -7970,55 +8093,249 @@ def run_repl(
compiler._loaded_files.clear()
compiler.parse_file(src_path)
except (ParseError, CompileError, CompileTimeError) as exc:
print(f"[error] {exc}")
print(_c(_C_RED, f"[error] {exc}"))
return False
except Exception as exc:
print(f"[error] parse failed: {exc}")
print(_c(_C_RED, f"[error] parse failed: {exc}"))
return False
finally:
_suppress_redefine_warnings_set(False)
try:
compiler.run_compile_time_word(word_name, libs=list(libs))
compiler.run_compile_time_word_repl(word_name, libs=list(libs))
except (CompileTimeError, _CTVMExit) as exc:
if isinstance(exc, _CTVMExit):
code = exc.args[0] if exc.args else 0
code = exc.code
if code != 0:
print(f"[warn] program exited with code {code}")
print(_c(_C_YELLOW, f"[warn] program exited with code {code}"))
else:
print(f"[error] {exc}")
print(_c(_C_RED, f"[error] {exc}"))
return False
except Exception as exc:
print(f"[error] execution failed: {exc}")
print(_c(_C_RED, f"[error] execution failed: {exc}"))
return False
return True
# -- Stack display --------------------------------------------------------
def _show_stack() -> None:
vm = compiler.parser.compile_time_vm
values = vm.repl_stack_values()
if not values:
print(_c(_C_DIM, "<empty stack>"))
else:
parts = []
for v in values:
if v < 0:
v = v + (1 << 64) # show as unsigned
parts.append(f"{v} (0x{v:x})")
elif v > 0xFFFF:
parts.append(f"{v} (0x{v:x})")
else:
parts.append(str(v))
depth_str = _c(_C_DIM, f"<{len(values)}>")
print(f"{depth_str} {' '.join(parts)}")
# -- Word listing ---------------------------------------------------------
def _show_words(filter_str: str = "") -> None:
all_words = sorted(compiler.dictionary.words.keys())
if filter_str:
all_words = [w for w in all_words if filter_str in w]
if not all_words:
print(_c(_C_DIM, "no matching words"))
return
# Print in columns
max_len = max(len(w) for w in all_words) + 2
cols = max(1, 80 // max_len)
for i in range(0, len(all_words), cols):
row = all_words[i:i + cols]
print(" ".join(w.ljust(max_len) for w in row))
print(_c(_C_DIM, f"({len(all_words)} words)"))
# -- Word type/info -------------------------------------------------------
def _show_type(word_name: str) -> None:
word = compiler.dictionary.lookup(word_name)
if word is None:
print(_c(_C_RED, f"word '{word_name}' not found"))
return
# Header: name + kind
defn = word.definition
if word.is_extern:
kind = "extern"
elif word.macro_expansion is not None:
kind = "macro"
elif isinstance(defn, AsmDefinition):
kind = "asm"
elif isinstance(defn, Definition):
kind = "word"
elif word.compile_time_intrinsic is not None or word.runtime_intrinsic is not None:
kind = "builtin"
elif word.macro is not None:
kind = "immediate/macro"
else:
kind = "unknown"
print(f" {_c(_C_BOLD, word_name)} {_c(_C_CYAN, kind)}")
# Tags
tags: List[str] = []
if word.immediate:
tags.append("immediate")
if word.compile_only:
tags.append("compile-only")
if word.inline:
tags.append("inline")
if word.compile_time_override:
tags.append("ct-override")
if word.priority != 0:
tags.append(f"priority={word.priority}")
if tags:
print(f" {_c(_C_DIM, ' tags: ')}{_c(_C_YELLOW, ' '.join(tags))}")
# Extern signature
if word.is_extern and word.extern_signature:
arg_types, ret_type = word.extern_signature
sig = f"{ret_type} {word_name}({', '.join(arg_types)})"
print(f" {_c(_C_DIM, ' sig: ')}{_c(_C_GREEN, sig)}")
elif word.is_extern:
print(f" {_c(_C_DIM, ' args: ')}{word.extern_inputs} in, {word.extern_outputs} out")
# Stack effect from definition comment
if isinstance(defn, Definition) and defn.stack_inputs is not None:
print(f" {_c(_C_DIM, ' args: ')}{defn.stack_inputs} inputs")
# Macro expansion
if word.macro_expansion is not None:
params = word.macro_params
expansion = " ".join(word.macro_expansion)
if len(expansion) > 80:
expansion = expansion[:77] + "..."
param_str = f" (${params} params)" if params else ""
print(f" {_c(_C_DIM, ' expands:')}{param_str} {expansion}")
# Asm body (trimmed)
if isinstance(defn, AsmDefinition):
body = defn.body.strip()
lines = body.splitlines()
if defn.effects:
print(f" {_c(_C_DIM, ' effects:')} {' '.join(sorted(defn.effects))}")
if len(lines) <= 6:
for ln in lines:
print(f" {_c(_C_DIM, ln.rstrip())}")
else:
for ln in lines[:4]:
print(f" {_c(_C_DIM, ln.rstrip())}")
print(f" {_c(_C_DIM, f'... ({len(lines)} lines total)')}")
# Word body (decompiled ops)
if isinstance(defn, Definition):
ops = defn.body
indent = 0
max_show = 12
shown = 0
for op in ops:
if shown >= max_show:
print(f" {_c(_C_DIM, f'... ({len(ops)} ops total)')}")
break
if op.op in ("branch_zero", "for_begin", "while_begin", "list_begin"):
pass
if op.op in ("jump", "for_end"):
indent = max(0, indent - 1)
if op.op == "literal":
if isinstance(op.data, str):
txt = f'"{op.data}"' if len(op.data) <= 40 else f'"{op.data[:37]}..."'
line_str = f" {txt}"
elif isinstance(op.data, float):
line_str = f" {op.data}"
else:
line_str = f" {op.data}"
elif op.op == "word":
line_str = f" {op.data}"
elif op.op == "branch_zero":
line_str = " if"
indent += 1
elif op.op == "jump":
line_str = " else/end"
elif op.op == "for_begin":
line_str = " for"
indent += 1
elif op.op == "for_end":
line_str = " end-for"
elif op.op == "label":
line_str = f" label {op.data}"
elif op.op == "goto":
line_str = f" goto {op.data}"
else:
line_str = f" {op.op}" + (f" {op.data}" if op.data is not None else "")
print(f" {_c(_C_DIM, ' ' * indent)}{line_str}")
shown += 1
# -- readline setup -------------------------------------------------------
history_path = temp_dir / "repl_history"
try:
import readline
readline.parse_and_bind("tab: complete")
try:
readline.read_history_file(str(history_path))
except (FileNotFoundError, OSError):
pass
def _completer(text: str, state: int) -> Optional[str]:
commands = [":help", ":show", ":reset", ":load ", ":call ",
":edit ", ":seteditor ", ":quit", ":q",
":stack", ":words ", ":type ", ":clear"]
if text.startswith(":"):
matches = [c for c in commands if c.startswith(text)]
else:
all_words = sorted(compiler.dictionary.words.keys())
matches = [w + " " for w in all_words if w.startswith(text)]
return matches[state] if state < len(matches) else None
readline.set_completer(_completer)
readline.set_completer_delims(" \t\n")
_has_readline = True
except ImportError:
_has_readline = False
# -- Help -----------------------------------------------------------------
def _print_help() -> None:
print("[repl] commands:")
print(" :help show this help")
print(" :show display current session source (with synthetic main if pending snippet)")
print(" :reset clear session imports/defs")
print(" :load <file> load a source file into the session")
print(" :call <word> execute a word via the compile-time VM")
print(" :edit [file] open session file or given file in editor")
print(" :seteditor [cmd] show/set editor command (default from $EDITOR or vim)")
print(" :quit | :q exit the REPL")
print("[repl] free-form input:")
print(_c(_C_BOLD, "[repl] commands:"))
cmds = [
(":help", "show this help"),
(":stack", "display the data stack"),
(":clear", "clear the data stack (keep definitions)"),
(":words [filter]", "list defined words (optionally filtered)"),
(":type <word>", "show word info / signature"),
(":show", "display current session source"),
(":reset", "clear everything — fresh VM and dictionary"),
(":load <file>", "load a source file into the session"),
(":call <word>", "execute a word via the compile-time VM"),
(":edit [file]", "open session file or given file in editor"),
(":seteditor [cmd]", "show/set editor command (default from $EDITOR)"),
(":quit | :q", "exit the REPL"),
]
for cmd, desc in cmds:
print(f" {_c(_C_GREEN, cmd.ljust(20))} {desc}")
print(_c(_C_BOLD, "[repl] free-form input:"))
print(" definitions (word/:asm/:py/extern/macro/struct) extend the session")
print(" imports add to session imports")
print(" other lines run immediately via the compile-time VM (not saved)")
print(" multiline: end lines with \\ to continue; finish with a non-\\ line")
print(" other lines run immediately (values stay on the stack)")
print(" multiline: end lines with \\ to continue")
print("[repl] type L2 code; :help for commands; :quit to exit")
print("[repl] execution via compile-time VM (instant, no nasm/ld)")
print("[repl] enter multiline with trailing \\; finish with a line without \\")
# -- Banner ---------------------------------------------------------------
prompt = _c(_C_GREEN + _C_BOLD, "l2> ") if use_color else "l2> "
cont_prompt = _c(_C_DIM, "... ") if use_color else "... "
print(_c(_C_BOLD, "[repl] L2 interactive — type :help for commands, :quit to exit"))
print(_c(_C_DIM, "[repl] state persists across evaluations; :reset to start fresh"))
pending_block: List[str] = []
while True:
try:
line = input("l2> ")
except EOFError:
cur_prompt = cont_prompt if pending_block else prompt
line = input(cur_prompt)
except (EOFError, KeyboardInterrupt):
print()
break
@@ -8028,6 +8345,28 @@ def run_repl(
if stripped == ":help":
_print_help()
continue
if stripped == ":stack":
_show_stack()
continue
if stripped == ":clear":
vm = compiler.parser.compile_time_vm
if vm._repl_initialized:
vm.r12 = vm._native_data_top
else:
vm.stack.clear()
print(_c(_C_DIM, "stack cleared"))
continue
if stripped.startswith(":words"):
filt = stripped.split(None, 1)[1].strip() if " " in stripped else ""
_show_words(filt)
continue
if stripped.startswith(":type "):
word_name = stripped.split(None, 1)[1].strip()
if word_name:
_show_type(word_name)
else:
print(_c(_C_RED, "[repl] usage: :type <word>"))
continue
if stripped == ":reset":
imports = list(default_imports)
user_defs_files.clear()
@@ -8035,12 +8374,11 @@ def run_repl(
main_body.clear()
has_user_main = False
pending_block.clear()
# Re-create compiler for a clean dictionary state
compiler = Compiler(
include_paths=include_paths,
macro_expansion_limit=compiler.parser.macro_expansion_limit,
)
print("[repl] session cleared")
print(_c(_C_DIM, "[repl] session reset — fresh VM and dictionary"))
continue
if stripped.startswith(":seteditor"):
parts = stripped.split(None, 1)
@@ -8064,7 +8402,7 @@ def run_repl(
)
src_path.write_text(current_source)
except Exception as exc:
print(f"[repl] failed to sync source before edit: {exc}")
print(_c(_C_RED, f"[repl] failed to sync source before edit: {exc}"))
try:
if not target_path.exists():
target_path.parent.mkdir(parents=True, exist_ok=True)
@@ -8090,11 +8428,11 @@ def run_repl(
user_defs_repl.clear()
main_body.clear()
has_user_main = _block_defines_main(new_body)
print("[repl] reloaded session source from editor")
print(_c(_C_DIM, "[repl] reloaded session source from editor"))
except Exception as exc:
print(f"[repl] failed to reload edited source: {exc}")
print(_c(_C_RED, f"[repl] failed to reload edited source: {exc}"))
except Exception as exc:
print(f"[repl] failed to launch editor: {exc}")
print(_c(_C_RED, f"[repl] failed to launch editor: {exc}"))
continue
if stripped == ":show":
source = _repl_build_source(imports, user_defs_files, user_defs_repl, main_body, has_user_main, force_synthetic=True)
@@ -8104,7 +8442,7 @@ def run_repl(
path_text = stripped.split(None, 1)[1].strip()
target_path = Path(path_text)
if not target_path.exists():
print(f"[repl] file not found: {target_path}")
print(_c(_C_RED, f"[repl] file not found: {target_path}"))
continue
try:
loaded_text = target_path.read_text()
@@ -8112,17 +8450,17 @@ def run_repl(
if _block_defines_main(loaded_text):
has_user_main = True
main_body.clear()
print(f"[repl] loaded {target_path}")
print(_c(_C_DIM, f"[repl] loaded {target_path}"))
except Exception as exc:
print(f"[repl] failed to load {target_path}: {exc}")
print(_c(_C_RED, f"[repl] failed to load {target_path}: {exc}"))
continue
if stripped.startswith(":call "):
word_name = stripped.split(None, 1)[1].strip()
if not word_name:
print("[repl] usage: :call <word>")
print(_c(_C_RED, "[repl] usage: :call <word>"))
continue
if word_name == "main" and not has_user_main:
print("[repl] cannot call main; no user-defined main present")
print(_c(_C_RED, "[repl] cannot call main; no user-defined main present"))
continue
if word_name == "main" and has_user_main:
source = _repl_build_source(imports, user_defs_files, user_defs_repl, [], True, force_synthetic=False)
@@ -8162,7 +8500,6 @@ def run_repl(
main_body.clear()
user_defs_repl.append(block)
else:
# Execute snippet immediately via the compile-time VM.
source = _repl_build_source(
imports,
user_defs_files,
@@ -8185,9 +8522,16 @@ def run_repl(
finally:
_suppress_redefine_warnings_set(False)
except (ParseError, CompileError, CompileTimeError) as exc:
print(f"[error] {exc}")
print(_c(_C_RED, f"[error] {exc}"))
continue
# Save readline history
if _has_readline:
try:
readline.write_history_file(str(history_path))
except OSError:
pass
return 0