From 4e51e8803f0e2404f9efaab0dcb2df7b851dd1c6 Mon Sep 17 00:00:00 2001 From: igor Date: Mon, 9 Mar 2026 16:10:02 +0100 Subject: [PATCH] improved the repl --- main.py | 426 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 385 insertions(+), 41 deletions(-) diff --git a/main.py b/main.py index 00ec49b..afeaaeb 100644 --- a/main.py +++ b/main.py @@ -1740,6 +1740,9 @@ class CompileTimeVM: self.r13: int = 0 # return stack pointer (grows downward) self._native_data_stack: Optional[Any] = None # ctypes buffer self._native_data_top: int = 0 + # REPL persistent state + self._repl_initialized: bool = False + self._repl_libs: List[str] = [] self._native_return_stack: Optional[Any] = None # ctypes buffer self._native_return_top: int = 0 # JIT cache: word name → ctypes callable @@ -1788,6 +1791,7 @@ class CompileTimeVM: self.r12 = 0 self.r13 = 0 self.current_location = None + self._repl_initialized = False def invoke(self, word: Word, *, runtime_mode: bool = False, libs: Optional[List[str]] = None) -> None: self.reset() @@ -1864,6 +1868,97 @@ class CompileTimeVM: self.push(value) self._call_word(word) + def invoke_repl(self, word: Word, *, libs: Optional[List[str]] = None) -> None: + """Execute *word* in runtime mode, preserving stack/memory across calls. + + On the first call (or after ``reset()``), allocates native stacks and + memory. Subsequent calls reuse the existing state so values left on + the data stack persist between REPL evaluations. + """ + self._ensure_jit_out() + prev_mode = self.runtime_mode + self.runtime_mode = True + + if not self._repl_initialized: + persistent_size = 0 + if self.parser.custom_bss: + for bss_line in self.parser.custom_bss: + m = _RE_BSS_PERSISTENT.search(bss_line) + if m: + persistent_size = int(m.group(1)) + self.memory = CTMemory(persistent_size) + self.memory.setup_argv(sys.argv) + + self._native_data_stack = ctypes.create_string_buffer(self.NATIVE_STACK_SIZE) + self._native_data_top = ctypes.addressof(self._native_data_stack) + self.NATIVE_STACK_SIZE + self.r12 = self._native_data_top + + self._native_return_stack = ctypes.create_string_buffer(self.NATIVE_STACK_SIZE) + self._native_return_top = ctypes.addressof(self._native_return_stack) + self.NATIVE_STACK_SIZE + self.r13 = self._native_return_top + + self._bss_symbols = { + "data_start": self.memory.data_start, + "data_end": self.memory.data_start + self.memory._data_offset if self.memory._data_offset else self.memory.data_end, + "print_buf": self.memory.print_buf_addr, + "print_buf_end": self.memory.print_buf_addr + CTMemory.PRINT_BUF_SIZE, + "persistent": self.memory.persistent_addr, + "persistent_end": self.memory.persistent_addr + self.memory._persistent_size, + "sys_argc": self.memory.sys_argc_addr, + "sys_argv": self.memory.sys_argv_addr, + } + self._jit_cache = {} + self._jit_code_pages = [] + self._dl_handles = [] + self._dl_func_cache = {} + all_libs = list(self._ct_libs) + if libs: + for lib in libs: + if lib not in all_libs: + all_libs.append(lib) + for lib_name in all_libs: + self._dlopen(lib_name) + + old_limit = sys.getrecursionlimit() + if old_limit < 10000: + sys.setrecursionlimit(10000) + self._repl_initialized = True + self._repl_libs = list(libs or []) + else: + # Subsequent call — open any new libraries not yet loaded + if libs: + for lib in libs: + if lib not in self._repl_libs: + self._dlopen(lib) + self._repl_libs.append(lib) + + # Clear transient state but keep stacks and memory + self.call_stack.clear() + self.loop_stack.clear() + self._list_capture_stack.clear() + self.current_location = None + # JIT cache must be cleared because word definitions change between + # REPL evaluations (re-parsed each time). + self._jit_cache.clear() + + try: + self._call_word(word) + except _CTVMExit: + pass + finally: + self.runtime_mode = prev_mode + + def repl_stack_values(self) -> List[int]: + """Return current native data stack contents (bottom to top).""" + if not self._repl_initialized or self.r12 >= self._native_data_top: + return [] + values = [] + addr = self._native_data_top - 8 + while addr >= self.r12: + values.append(CTMemory.read_qword(addr)) + addr -= 8 + return values + def push(self, value: Any) -> None: if self.runtime_mode: self.r12 -= 8 @@ -7429,6 +7524,13 @@ class Compiler: return self.parser.compile_time_vm.invoke(word, runtime_mode=True, libs=libs) + def run_compile_time_word_repl(self, name: str, *, libs: Optional[List[str]] = None) -> None: + """Like run_compile_time_word but uses invoke_repl for persistent state.""" + word = self.dictionary.lookup(name) + if word is None: + raise CompileTimeError(f"word '{name}' not defined; cannot run at compile time") + self.parser.compile_time_vm.invoke_repl(word, libs=libs) + def _resolve_import_target(self, importing_file: Path, target: str) -> Path: raw = Path(target) tried: List[Path] = [] @@ -7919,8 +8021,27 @@ def run_repl( debug: bool = False, initial_source: Optional[Path] = None, ) -> int: - """REPL backed by the compile-time VM for instant execution.""" + """REPL backed by the compile-time VM for instant execution. + State (data stack, memory, definitions) persists across evaluations. + Use ``:reset`` to start fresh. + """ + + # -- Colors --------------------------------------------------------------- + _C_RESET = "\033[0m" + _C_BOLD = "\033[1m" + _C_DIM = "\033[2m" + _C_GREEN = "\033[32m" + _C_CYAN = "\033[36m" + _C_YELLOW = "\033[33m" + _C_RED = "\033[31m" + _C_MAGENTA = "\033[35m" + + use_color = sys.stdout.isatty() + def _c(code: str, text: str) -> str: + return f"{code}{text}{_C_RESET}" if use_color else text + + # -- Helpers -------------------------------------------------------------- def _block_defines_main(block: str) -> bool: stripped_lines = [ln.strip() for ln in block.splitlines() if ln.strip() and not ln.strip().startswith("#")] for idx, stripped in enumerate(stripped_lines): @@ -7954,13 +8075,15 @@ def run_repl( has_user_main = has_user_main or _block_defines_main(initial_text) if has_user_main: main_body.clear() - print(f"[repl] loaded {initial_source}") + print(_c(_C_DIM, f"[repl] loaded {initial_source}")) except Exception as exc: - print(f"[repl] failed to load {initial_source}: {exc}") + print(_c(_C_RED, f"[repl] failed to load {initial_source}: {exc}")) + # -- Persistent VM execution ---------------------------------------------- def _run_on_ct_vm(source: str, word_name: str = "main") -> bool: - """Parse source and execute word_name via the compile-time VM. + """Parse source and execute *word_name* via the compile-time VM. + Uses ``invoke_repl`` so stacks/memory persist across calls. Returns True on success, False on error (already printed). """ nonlocal compiler @@ -7970,55 +8093,249 @@ def run_repl( compiler._loaded_files.clear() compiler.parse_file(src_path) except (ParseError, CompileError, CompileTimeError) as exc: - print(f"[error] {exc}") + print(_c(_C_RED, f"[error] {exc}")) return False except Exception as exc: - print(f"[error] parse failed: {exc}") + print(_c(_C_RED, f"[error] parse failed: {exc}")) return False finally: _suppress_redefine_warnings_set(False) try: - compiler.run_compile_time_word(word_name, libs=list(libs)) + compiler.run_compile_time_word_repl(word_name, libs=list(libs)) except (CompileTimeError, _CTVMExit) as exc: if isinstance(exc, _CTVMExit): - code = exc.args[0] if exc.args else 0 + code = exc.code if code != 0: - print(f"[warn] program exited with code {code}") + print(_c(_C_YELLOW, f"[warn] program exited with code {code}")) else: - print(f"[error] {exc}") + print(_c(_C_RED, f"[error] {exc}")) return False except Exception as exc: - print(f"[error] execution failed: {exc}") + print(_c(_C_RED, f"[error] execution failed: {exc}")) return False return True + # -- Stack display -------------------------------------------------------- + def _show_stack() -> None: + vm = compiler.parser.compile_time_vm + values = vm.repl_stack_values() + if not values: + print(_c(_C_DIM, "")) + else: + parts = [] + for v in values: + if v < 0: + v = v + (1 << 64) # show as unsigned + parts.append(f"{v} (0x{v:x})") + elif v > 0xFFFF: + parts.append(f"{v} (0x{v:x})") + else: + parts.append(str(v)) + depth_str = _c(_C_DIM, f"<{len(values)}>") + print(f"{depth_str} {' '.join(parts)}") + + # -- Word listing --------------------------------------------------------- + def _show_words(filter_str: str = "") -> None: + all_words = sorted(compiler.dictionary.words.keys()) + if filter_str: + all_words = [w for w in all_words if filter_str in w] + if not all_words: + print(_c(_C_DIM, "no matching words")) + return + # Print in columns + max_len = max(len(w) for w in all_words) + 2 + cols = max(1, 80 // max_len) + for i in range(0, len(all_words), cols): + row = all_words[i:i + cols] + print(" ".join(w.ljust(max_len) for w in row)) + print(_c(_C_DIM, f"({len(all_words)} words)")) + + # -- Word type/info ------------------------------------------------------- + def _show_type(word_name: str) -> None: + word = compiler.dictionary.lookup(word_name) + if word is None: + print(_c(_C_RED, f"word '{word_name}' not found")) + return + + # Header: name + kind + defn = word.definition + if word.is_extern: + kind = "extern" + elif word.macro_expansion is not None: + kind = "macro" + elif isinstance(defn, AsmDefinition): + kind = "asm" + elif isinstance(defn, Definition): + kind = "word" + elif word.compile_time_intrinsic is not None or word.runtime_intrinsic is not None: + kind = "builtin" + elif word.macro is not None: + kind = "immediate/macro" + else: + kind = "unknown" + print(f" {_c(_C_BOLD, word_name)} {_c(_C_CYAN, kind)}") + + # Tags + tags: List[str] = [] + if word.immediate: + tags.append("immediate") + if word.compile_only: + tags.append("compile-only") + if word.inline: + tags.append("inline") + if word.compile_time_override: + tags.append("ct-override") + if word.priority != 0: + tags.append(f"priority={word.priority}") + if tags: + print(f" {_c(_C_DIM, ' tags: ')}{_c(_C_YELLOW, ' '.join(tags))}") + + # Extern signature + if word.is_extern and word.extern_signature: + arg_types, ret_type = word.extern_signature + sig = f"{ret_type} {word_name}({', '.join(arg_types)})" + print(f" {_c(_C_DIM, ' sig: ')}{_c(_C_GREEN, sig)}") + elif word.is_extern: + print(f" {_c(_C_DIM, ' args: ')}{word.extern_inputs} in, {word.extern_outputs} out") + + # Stack effect from definition comment + if isinstance(defn, Definition) and defn.stack_inputs is not None: + print(f" {_c(_C_DIM, ' args: ')}{defn.stack_inputs} inputs") + + # Macro expansion + if word.macro_expansion is not None: + params = word.macro_params + expansion = " ".join(word.macro_expansion) + if len(expansion) > 80: + expansion = expansion[:77] + "..." + param_str = f" (${params} params)" if params else "" + print(f" {_c(_C_DIM, ' expands:')}{param_str} {expansion}") + + # Asm body (trimmed) + if isinstance(defn, AsmDefinition): + body = defn.body.strip() + lines = body.splitlines() + if defn.effects: + print(f" {_c(_C_DIM, ' effects:')} {' '.join(sorted(defn.effects))}") + if len(lines) <= 6: + for ln in lines: + print(f" {_c(_C_DIM, ln.rstrip())}") + else: + for ln in lines[:4]: + print(f" {_c(_C_DIM, ln.rstrip())}") + print(f" {_c(_C_DIM, f'... ({len(lines)} lines total)')}") + + # Word body (decompiled ops) + if isinstance(defn, Definition): + ops = defn.body + indent = 0 + max_show = 12 + shown = 0 + for op in ops: + if shown >= max_show: + print(f" {_c(_C_DIM, f'... ({len(ops)} ops total)')}") + break + if op.op in ("branch_zero", "for_begin", "while_begin", "list_begin"): + pass + if op.op in ("jump", "for_end"): + indent = max(0, indent - 1) + + if op.op == "literal": + if isinstance(op.data, str): + txt = f'"{op.data}"' if len(op.data) <= 40 else f'"{op.data[:37]}..."' + line_str = f" {txt}" + elif isinstance(op.data, float): + line_str = f" {op.data}" + else: + line_str = f" {op.data}" + elif op.op == "word": + line_str = f" {op.data}" + elif op.op == "branch_zero": + line_str = " if" + indent += 1 + elif op.op == "jump": + line_str = " else/end" + elif op.op == "for_begin": + line_str = " for" + indent += 1 + elif op.op == "for_end": + line_str = " end-for" + elif op.op == "label": + line_str = f" label {op.data}" + elif op.op == "goto": + line_str = f" goto {op.data}" + else: + line_str = f" {op.op}" + (f" {op.data}" if op.data is not None else "") + + print(f" {_c(_C_DIM, ' ' * indent)}{line_str}") + shown += 1 + + # -- readline setup ------------------------------------------------------- + history_path = temp_dir / "repl_history" + try: + import readline + readline.parse_and_bind("tab: complete") + try: + readline.read_history_file(str(history_path)) + except (FileNotFoundError, OSError): + pass + + def _completer(text: str, state: int) -> Optional[str]: + commands = [":help", ":show", ":reset", ":load ", ":call ", + ":edit ", ":seteditor ", ":quit", ":q", + ":stack", ":words ", ":type ", ":clear"] + if text.startswith(":"): + matches = [c for c in commands if c.startswith(text)] + else: + all_words = sorted(compiler.dictionary.words.keys()) + matches = [w + " " for w in all_words if w.startswith(text)] + return matches[state] if state < len(matches) else None + + readline.set_completer(_completer) + readline.set_completer_delims(" \t\n") + _has_readline = True + except ImportError: + _has_readline = False + + # -- Help ----------------------------------------------------------------- def _print_help() -> None: - print("[repl] commands:") - print(" :help show this help") - print(" :show display current session source (with synthetic main if pending snippet)") - print(" :reset clear session imports/defs") - print(" :load load a source file into the session") - print(" :call execute a word via the compile-time VM") - print(" :edit [file] open session file or given file in editor") - print(" :seteditor [cmd] show/set editor command (default from $EDITOR or vim)") - print(" :quit | :q exit the REPL") - print("[repl] free-form input:") + print(_c(_C_BOLD, "[repl] commands:")) + cmds = [ + (":help", "show this help"), + (":stack", "display the data stack"), + (":clear", "clear the data stack (keep definitions)"), + (":words [filter]", "list defined words (optionally filtered)"), + (":type ", "show word info / signature"), + (":show", "display current session source"), + (":reset", "clear everything — fresh VM and dictionary"), + (":load ", "load a source file into the session"), + (":call ", "execute a word via the compile-time VM"), + (":edit [file]", "open session file or given file in editor"), + (":seteditor [cmd]", "show/set editor command (default from $EDITOR)"), + (":quit | :q", "exit the REPL"), + ] + for cmd, desc in cmds: + print(f" {_c(_C_GREEN, cmd.ljust(20))} {desc}") + print(_c(_C_BOLD, "[repl] free-form input:")) print(" definitions (word/:asm/:py/extern/macro/struct) extend the session") print(" imports add to session imports") - print(" other lines run immediately via the compile-time VM (not saved)") - print(" multiline: end lines with \\ to continue; finish with a non-\\ line") + print(" other lines run immediately (values stay on the stack)") + print(" multiline: end lines with \\ to continue") - print("[repl] type L2 code; :help for commands; :quit to exit") - print("[repl] execution via compile-time VM (instant, no nasm/ld)") - print("[repl] enter multiline with trailing \\; finish with a line without \\") + # -- Banner --------------------------------------------------------------- + prompt = _c(_C_GREEN + _C_BOLD, "l2> ") if use_color else "l2> " + cont_prompt = _c(_C_DIM, "... ") if use_color else "... " + print(_c(_C_BOLD, "[repl] L2 interactive — type :help for commands, :quit to exit")) + print(_c(_C_DIM, "[repl] state persists across evaluations; :reset to start fresh")) pending_block: List[str] = [] while True: try: - line = input("l2> ") - except EOFError: + cur_prompt = cont_prompt if pending_block else prompt + line = input(cur_prompt) + except (EOFError, KeyboardInterrupt): print() break @@ -8028,6 +8345,28 @@ def run_repl( if stripped == ":help": _print_help() continue + if stripped == ":stack": + _show_stack() + continue + if stripped == ":clear": + vm = compiler.parser.compile_time_vm + if vm._repl_initialized: + vm.r12 = vm._native_data_top + else: + vm.stack.clear() + print(_c(_C_DIM, "stack cleared")) + continue + if stripped.startswith(":words"): + filt = stripped.split(None, 1)[1].strip() if " " in stripped else "" + _show_words(filt) + continue + if stripped.startswith(":type "): + word_name = stripped.split(None, 1)[1].strip() + if word_name: + _show_type(word_name) + else: + print(_c(_C_RED, "[repl] usage: :type ")) + continue if stripped == ":reset": imports = list(default_imports) user_defs_files.clear() @@ -8035,12 +8374,11 @@ def run_repl( main_body.clear() has_user_main = False pending_block.clear() - # Re-create compiler for a clean dictionary state compiler = Compiler( include_paths=include_paths, macro_expansion_limit=compiler.parser.macro_expansion_limit, ) - print("[repl] session cleared") + print(_c(_C_DIM, "[repl] session reset — fresh VM and dictionary")) continue if stripped.startswith(":seteditor"): parts = stripped.split(None, 1) @@ -8064,7 +8402,7 @@ def run_repl( ) src_path.write_text(current_source) except Exception as exc: - print(f"[repl] failed to sync source before edit: {exc}") + print(_c(_C_RED, f"[repl] failed to sync source before edit: {exc}")) try: if not target_path.exists(): target_path.parent.mkdir(parents=True, exist_ok=True) @@ -8090,11 +8428,11 @@ def run_repl( user_defs_repl.clear() main_body.clear() has_user_main = _block_defines_main(new_body) - print("[repl] reloaded session source from editor") + print(_c(_C_DIM, "[repl] reloaded session source from editor")) except Exception as exc: - print(f"[repl] failed to reload edited source: {exc}") + print(_c(_C_RED, f"[repl] failed to reload edited source: {exc}")) except Exception as exc: - print(f"[repl] failed to launch editor: {exc}") + print(_c(_C_RED, f"[repl] failed to launch editor: {exc}")) continue if stripped == ":show": source = _repl_build_source(imports, user_defs_files, user_defs_repl, main_body, has_user_main, force_synthetic=True) @@ -8104,7 +8442,7 @@ def run_repl( path_text = stripped.split(None, 1)[1].strip() target_path = Path(path_text) if not target_path.exists(): - print(f"[repl] file not found: {target_path}") + print(_c(_C_RED, f"[repl] file not found: {target_path}")) continue try: loaded_text = target_path.read_text() @@ -8112,17 +8450,17 @@ def run_repl( if _block_defines_main(loaded_text): has_user_main = True main_body.clear() - print(f"[repl] loaded {target_path}") + print(_c(_C_DIM, f"[repl] loaded {target_path}")) except Exception as exc: - print(f"[repl] failed to load {target_path}: {exc}") + print(_c(_C_RED, f"[repl] failed to load {target_path}: {exc}")) continue if stripped.startswith(":call "): word_name = stripped.split(None, 1)[1].strip() if not word_name: - print("[repl] usage: :call ") + print(_c(_C_RED, "[repl] usage: :call ")) continue if word_name == "main" and not has_user_main: - print("[repl] cannot call main; no user-defined main present") + print(_c(_C_RED, "[repl] cannot call main; no user-defined main present")) continue if word_name == "main" and has_user_main: source = _repl_build_source(imports, user_defs_files, user_defs_repl, [], True, force_synthetic=False) @@ -8162,7 +8500,6 @@ def run_repl( main_body.clear() user_defs_repl.append(block) else: - # Execute snippet immediately via the compile-time VM. source = _repl_build_source( imports, user_defs_files, @@ -8185,9 +8522,16 @@ def run_repl( finally: _suppress_redefine_warnings_set(False) except (ParseError, CompileError, CompileTimeError) as exc: - print(f"[error] {exc}") + print(_c(_C_RED, f"[error] {exc}")) continue + # Save readline history + if _has_readline: + try: + readline.write_history_file(str(history_path)) + except OSError: + pass + return 0