From 238fc4901082ff8322ad35a5898349296aa24247 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Mon, 29 Jun 2026 18:49:51 +0800 Subject: [PATCH 01/10] Add serial add mutation coordinator --- openkb/add_coordinator.py | 136 ++++++++++++++++++++++++++++++++++++++ openkb/locks.py | 14 ++++ 2 files changed, 150 insertions(+) create mode 100644 openkb/add_coordinator.py diff --git a/openkb/add_coordinator.py b/openkb/add_coordinator.py new file mode 100644 index 00000000..2d427b1f --- /dev/null +++ b/openkb/add_coordinator.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import logging +import shutil +from collections.abc import Callable, Sequence +from dataclasses import dataclass, field +from pathlib import Path + +import click + +from openkb.locks import kb_ingest_lock_held +from openkb.mutation import snapshot_paths + +logger = logging.getLogger(__name__) + +MutationBody = Callable[[], None] +PostCommitHook = Callable[[], None] + + +class DirtyRollbackError(RuntimeError): + """A mutation's rollback failed, leaving an active journal on disk. + + The KB may be in a partially-applied state that the retained journal will + attempt to roll back on the next exclusive-lock acquisition. Batch owners + (the parallel/serial ``add`` loops) MUST stop committing further mutations + on top of this dirty state instead of continuing — otherwise the next + recovery rolls this journal back over the shared paths it recorded + (``hashes.json``, ``index.md``, ``concepts/``, ``entities/``) and silently + clobbers the later commits. Single-mutation callers should let it propagate + so the command fails loudly; rerunning recovers via the drain. + """ + + def __init__(self, operation: str, journal_path: Path) -> None: + super().__init__( + f"Dirty rollback for {operation}; journal retained at {journal_path}. " + f"Rerun the command to recover." + ) + self.operation = operation + self.journal_path = journal_path + + +@dataclass(slots=True) +class AddMutationPlan: + operation: str + details: dict + touched_paths: Sequence[Path] + body: MutationBody + post_commit_hooks: Sequence[PostCommitHook] = field(default_factory=tuple) + hardlink_dirs: set[Path] = field(default_factory=set) + staging_dirs: Sequence[Path | None] = field(default_factory=tuple) + rollback_error_message: str = "Rollback failed; mutation journal retained for recovery" + + +def _cleanup_staging_dirs(staging_dirs: Sequence[Path | None]) -> None: + for staging_dir in staging_dirs: + if staging_dir is not None: + shutil.rmtree(staging_dir, ignore_errors=True) + + +def _rollback_snapshot(plan: AddMutationPlan, snapshot) -> Path | None: + """Best-effort rollback; returns the retained journal path on dirty failure. + + Returns ``snapshot.journal_path`` when the snapshot existed but rollback + FAILED (the active journal is retained for next-run recovery), otherwise + ``None`` — covering both "snapshot is None" (nothing was applied; the + failure happened during snapshot setup before the body ran) and a clean + rollback that discarded its journal. + """ + if snapshot is None: + _cleanup_staging_dirs(plan.staging_dirs) + return None + rollback_error = snapshot.rollback_best_effort() + if rollback_error is None: + snapshot.discard_best_effort() + else: + click.echo(f" [ERROR] {plan.rollback_error_message}: {snapshot.journal_path}") + _cleanup_staging_dirs(plan.staging_dirs) + return snapshot.journal_path if rollback_error is not None else None + + +def _failure_target(details: dict) -> str: + for key in ("name", "doc_name", "doc_id"): + value = details.get(key) + if value: + return f" for {value}" + return "" + + +def run_add_mutation(kb_dir: Path, plan: AddMutationPlan) -> bool: + if not kb_ingest_lock_held(kb_dir / ".openkb"): + raise RuntimeError( + "run_add_mutation requires the caller to hold kb_ingest_lock" + ) + snapshot = None + try: + snapshot = snapshot_paths( + kb_dir, + list(plan.touched_paths), + operation=plan.operation, + details=plan.details, + hardlink_dirs=plan.hardlink_dirs, + ) + plan.body() + snapshot.mark_committed() + except Exception as exc: + dirty_journal = _rollback_snapshot(plan, snapshot) + if dirty_journal is not None: + # Rollback failed and left an active journal. Stop the batch rather + # than committing more docs on top of dirty state that the next + # recovery would roll back over. + raise DirtyRollbackError(plan.operation, dirty_journal) + click.echo( + f" [ERROR] {plan.operation} failed{_failure_target(plan.details)}: {exc}" + ) + logger.debug("%s mutation failed:", plan.operation, exc_info=True) + return False + except BaseException: + # Interrupt (KeyboardInterrupt / SystemExit): best-effort rollback for + # its side-effects only. Do NOT raise DirtyRollbackError — propagate the + # interrupt so the user's abort is honored. Any retained journal or + # orphaned staging is recovered next run by the drain + reaper. + _rollback_snapshot(plan, snapshot) + raise + finally: + _cleanup_staging_dirs(plan.staging_dirs) + + for hook in plan.post_commit_hooks: + try: + hook() + except Exception as exc: + logger.warning("Post-commit hook failed for %s: %s", plan.operation, exc) + + cleanup_error = snapshot.discard_best_effort() + if cleanup_error is not None: + click.echo(f" [WARN] mutation journal cleanup failed: {cleanup_error}") + return True diff --git a/openkb/locks.py b/openkb/locks.py index 72966fc1..23d739e7 100644 --- a/openkb/locks.py +++ b/openkb/locks.py @@ -179,6 +179,20 @@ def kb_read_lock(openkb_dir: Path): return kb_lock(openkb_dir, exclusive=False) +def kb_ingest_lock_held(openkb_dir: Path) -> bool: + """Return True iff the *current thread* holds the exclusive ingest lock. + + Reentrancy is tracked per-thread (``threading.local``), so a worker thread + returns ``False`` even when the main thread holds the lock. Mutation + primitives use this to assert they run on the lock-owning thread rather + than silently deadlocking on a worker's separate OS ``flock`` acquire. + """ + held = _held_locks() + resolved = (openkb_dir / "ingest.lock").resolve() + exclusive_depth, _ = held.get(resolved, (0, 0)) + return exclusive_depth > 0 + + def _fsync_directory(path: Path) -> None: if os.name == "nt": # Windows cannot open a directory handle to fsync it. os.replace is From 0401b8135802549b19acc1dd4c427252375d3088 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Mon, 29 Jun 2026 18:52:36 +0800 Subject: [PATCH 02/10] Route serial add paths through coordinator --- openkb/cli.py | 186 ++++++++++++--------------- tests/test_add_command.py | 262 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 343 insertions(+), 105 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index f4297469..e26e8f84 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -52,7 +52,7 @@ def filter(self, record: logging.LogRecord) -> bool: from openkb.indexer import _write_long_doc_artifacts, prepare_cloud_import from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock from openkb.log import append_log -from openkb.mutation import MutationSnapshot, publish_staged_tree, snapshot_paths +from openkb.mutation import publish_staged_tree from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS # Suppress warnings after all imports — markitdown overrides filters at import time @@ -457,29 +457,15 @@ def _add_single_file_locked( index_result = None # populated only on the long-doc branch final_raw, final_source = _final_artifact_paths(result, kb_dir) - try: - snapshot = snapshot_paths( - kb_dir, - _snapshot_add_paths(kb_dir, doc_name, final_raw, final_source), - operation="add", - details={ - "file_hash": result.file_hash, - "name": file_path.name, - "doc_name": doc_name, - }, - hardlink_dirs={ - kb_dir / "wiki" / "concepts", - kb_dir / "wiki" / "entities", - kb_dir / ".openkb" / "files", - }, - ) + + def commit_body() -> None: + nonlocal index_result publish_staged_tree(staging_dir, kb_dir) if final_raw is not None: result.raw_path = final_raw if final_source is not None: result.source_path = final_source - # 3/4. Index and compile if result.is_long_doc: if result.raw_path is None: raise RuntimeError(f"Converted long document has no raw artifact: {file_path.name}") @@ -542,34 +528,30 @@ def _add_single_file_locked( registry.remove_by_hash(existing_hash) registry.add(result.file_hash, meta) - snapshot.mark_committed() - except Exception: - if snapshot is None: - click.echo(f" [ERROR] Failed to prepare mutation snapshot for {file_path.name}.") - _cleanup_staging(staging_dir) - return "failed" - rollback_error = snapshot.rollback_best_effort() - if rollback_error is None: - snapshot.discard_best_effort() - else: - click.echo( - " [ERROR] Rollback failed; mutation journal retained for recovery: " - f"{snapshot.journal_path}" - ) - _cleanup_staging(staging_dir) - return "failed" - finally: - _cleanup_staging(staging_dir) - - try: + def append_ingest_log() -> None: append_log(kb_dir / "wiki", "ingest", file_path.name) - except Exception as exc: - logger.warning("Failed to append ingest log for %s: %s", file_path.name, exc) - cleanup_error = snapshot.discard_best_effort() - if cleanup_error is not None: - click.echo( - f" [WARN] {file_path.name} added, but mutation journal cleanup failed: {cleanup_error}" - ) + + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + + plan = AddMutationPlan( + operation="add", + details={ + "file_hash": result.file_hash, + "name": file_path.name, + "doc_name": doc_name, + }, + touched_paths=_snapshot_add_paths(kb_dir, doc_name, final_raw, final_source), + body=commit_body, + post_commit_hooks=[append_ingest_log], + hardlink_dirs={ + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + kb_dir / ".openkb" / "files", + }, + staging_dirs=[staging_dir], + ) + if not run_add_mutation(kb_dir, plan): + return "failed" click.echo(f" [OK] {file_path.name} added to knowledge base.") return "added" @@ -602,8 +584,9 @@ def import_from_pageindex_cloud( return "skipped" click.echo(f"Importing from PageIndex Cloud: {doc_id}") - snapshot: MutationSnapshot | None = None doc_name = "" + from openkb.add_coordinator import AddMutationPlan, DirtyRollbackError, run_add_mutation + try: try: cloud = prepare_cloud_import(doc_id, kb_dir, path_key) @@ -613,76 +596,69 @@ def import_from_pageindex_cloud( return "failed" doc_name = cloud.doc_name - snapshot = snapshot_paths( - kb_dir, - _snapshot_add_paths(kb_dir, doc_name, None, None), + + def commit_body() -> None: + summary_path = _write_long_doc_artifacts( + cloud.tree, + cloud.all_pages, + doc_name, + doc_id, + kb_dir, + description=cloud.description, + ) + _run_compile_with_retry( + lambda: compile_long_doc( + doc_name, + summary_path, + doc_id, + kb_dir, + model, + doc_description=cloud.description, + ), + label=f"Compiling imported doc (doc_id={doc_id})", + ) + + # Register the raw-less cloud entry only after successful compilation. + registry = HashRegistry(openkb_dir / "hashes.json") + meta = { + "name": cloud.cloud_name, + "doc_name": doc_name, + "type": "pageindex_cloud", + "origin": "cloud", + "path": path_key, + "source_path": _registry_path( + kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir + ), + "doc_id": doc_id, + } + registry.remove_by_doc_name(doc_name) + registry.add(synthetic_hash, meta) + + def append_cloud_log() -> None: + append_log(kb_dir / "wiki", "ingest", doc_name) + + plan = AddMutationPlan( operation="cloud_import", details={"doc_id": doc_id, "doc_name": doc_name}, + touched_paths=_snapshot_add_paths(kb_dir, doc_name, None, None), + body=commit_body, + post_commit_hooks=[append_cloud_log], hardlink_dirs={ kb_dir / "wiki" / "concepts", kb_dir / "wiki" / "entities", kb_dir / ".openkb" / "files", }, ) - summary_path = _write_long_doc_artifacts( - cloud.tree, - cloud.all_pages, - doc_name, - doc_id, - kb_dir, - description=cloud.description, - ) - _run_compile_with_retry( - lambda: compile_long_doc( - doc_name, - summary_path, - doc_id, - kb_dir, - model, - doc_description=cloud.description, - ), - label=f"Compiling imported doc (doc_id={doc_id})", - ) - - # Register the raw-less cloud entry only after successful compilation. - registry = HashRegistry(openkb_dir / "hashes.json") - meta = { - "name": cloud.cloud_name, - "doc_name": doc_name, - "type": "pageindex_cloud", - "origin": "cloud", - "path": path_key, - "source_path": _registry_path( - kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir - ), - "doc_id": doc_id, - } - registry.remove_by_doc_name(doc_name) - registry.add(synthetic_hash, meta) - snapshot.mark_committed() + with kb_ingest_lock(kb_dir / ".openkb"): + if not run_add_mutation(kb_dir, plan): + return "failed" + except DirtyRollbackError: + raise except Exception: - if snapshot is None: - click.echo(f" [ERROR] Failed to prepare mutation snapshot for cloud import {doc_id}.") - return "failed" - rollback_error = snapshot.rollback_best_effort() - if rollback_error is None: - snapshot.discard_best_effort() - else: - click.echo( - " [ERROR] Rollback failed; mutation journal retained for recovery: " - f"{snapshot.journal_path}" - ) + click.echo(f" [ERROR] Failed to prepare mutation snapshot for cloud import {doc_id}.") + logger.debug("Cloud import mutation traceback:", exc_info=True) return "failed" - try: - append_log(kb_dir / "wiki", "ingest", doc_name) - except Exception as exc: - logger.warning("Failed to append ingest log for cloud import %s: %s", doc_id, exc) - cleanup_error = snapshot.discard_best_effort() - if cleanup_error is not None: - click.echo( - f" [WARN] {doc_name} imported, but mutation journal cleanup failed: {cleanup_error}" - ) click.echo(f" [OK] {doc_name} imported from PageIndex Cloud.") return "added" diff --git a/tests/test_add_command.py b/tests/test_add_command.py index a73ed4ac..516d8fc6 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -90,6 +90,35 @@ def test_add_single_file_compile_failure_rolls_back_converted_artifacts(self, tm assert not (kb_dir / "wiki" / "sources" / "notes.md").exists() assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + def test_add_single_file_uses_add_mutation_coordinator(self, tmp_path): + from openkb.cli import add_single_file + from openkb.converter import ConvertResult + + kb_dir = self._setup_kb(tmp_path) + doc = tmp_path / "coordinated.md" + doc.write_text("# Coordinated\n", encoding="utf-8") + staging_root = kb_dir / ".openkb" / "staging" / "fake" + source_path = staging_root / "wiki" / "sources" / "coordinated.md" + raw_path = staging_root / "raw" / "coordinated.md" + result = ConvertResult( + raw_path=raw_path, + source_path=source_path, + file_hash="abc123", + doc_name="coordinated", + ) + + with patch("openkb.cli.convert_document", return_value=result), \ + patch("openkb.cli.publish_staged_tree"), \ + patch("openkb.cli.asyncio.run"), \ + patch("openkb.cli._setup_llm_key"), \ + patch("openkb.add_coordinator.run_add_mutation", return_value=True) as mock_run: + assert add_single_file(doc, kb_dir) == "added" + + plan = mock_run.call_args.args[1] + assert plan.operation == "add" + assert plan.details["doc_name"] == "coordinated" + assert kb_dir / ".openkb" / "hashes.json" in plan.touched_paths + def test_add_directory_calls_helper_for_each_file(self, tmp_path): kb_dir = self._setup_kb(tmp_path) docs_dir = tmp_path / "docs" @@ -351,6 +380,64 @@ def test_import_failure_returns_failed_and_registers_nothing(self, tmp_path): registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") assert registry.all_entries() == {} + def test_cloud_import_uses_add_mutation_coordinator(self, tmp_path): + from openkb.cli import import_from_pageindex_cloud + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + + with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ + patch("openkb.add_coordinator.run_add_mutation", return_value=True) as mock_run, \ + patch("openkb.cli._setup_llm_key"): + assert import_from_pageindex_cloud("cloud-1", kb_dir) == "added" + + plan = mock_run.call_args.args[1] + assert plan.operation == "cloud_import" + assert plan.details == {"doc_id": "cloud-1", "doc_name": "Cloud-Paper"} + assert kb_dir / ".openkb" / "hashes.json" in plan.touched_paths + + def test_cloud_import_propagates_dirty_rollback(self, tmp_path): + import pytest + + from openkb.add_coordinator import DirtyRollbackError + from openkb.cli import import_from_pageindex_cloud + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + dirty_error = DirtyRollbackError( + "cloud_import", + kb_dir / ".openkb" / "journal" / "retained.json", + ) + + with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ + patch("openkb.add_coordinator.run_add_mutation", side_effect=dirty_error), \ + patch("openkb.cli._setup_llm_key"): + with pytest.raises(DirtyRollbackError) as exc_info: + import_from_pageindex_cloud("cloud-1", kb_dir) + + assert exc_info.value is dirty_error + + def test_cloud_import_keyboard_interrupt_rolls_back_artifacts(self, tmp_path): + import pytest + + from openkb.cli import import_from_pageindex_cloud + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + doc_name = "Cloud-Paper" + cloud = self._cloud_data(doc_name=doc_name) + + with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ + patch("openkb.cli.compile_long_doc", side_effect=KeyboardInterrupt()), \ + patch("openkb.cli._setup_llm_key"): + with pytest.raises(KeyboardInterrupt): + import_from_pageindex_cloud("cloud-1", kb_dir) + + assert not (kb_dir / "wiki" / "summaries" / f"{doc_name}.md").exists() + assert not (kb_dir / "wiki" / "sources" / f"{doc_name}.json").exists() + assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + assert list((kb_dir / ".openkb" / "journal").glob("*.json")) == [] + def test_compile_failure_cleans_up_orphan_artifacts(self, tmp_path): """If import succeeds (artifacts written) but compile fails twice, the summary/source artifacts are cleaned up — no registry entry exists, so @@ -377,3 +464,178 @@ def test_compile_failure_cleans_up_orphan_artifacts(self, tmp_path): assert not (kb_dir / "wiki" / "sources" / f"{doc_name}.json").exists() # Nothing registered → a retry is not skipped. assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + + +class TestAddMutationCoordinator: + def _setup_kb(self, tmp_path): + (tmp_path / "raw").mkdir() + (tmp_path / "wiki" / "sources" / "images").mkdir(parents=True) + (tmp_path / "wiki" / "summaries").mkdir(parents=True) + (tmp_path / "wiki" / "concepts").mkdir(parents=True) + (tmp_path / "wiki" / "entities").mkdir(parents=True) + openkb_dir = tmp_path / ".openkb" + openkb_dir.mkdir() + (openkb_dir / "config.yaml").write_text("model: gpt-4o-mini\n", encoding="utf-8") + (openkb_dir / "hashes.json").write_text("{}", encoding="utf-8") + return tmp_path + + def test_coordinator_rolls_back_before_commit_and_skips_post_commit(self, tmp_path): + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + from openkb.locks import kb_ingest_lock + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + post_commit_calls = [] + + def body(): + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# changed\n", encoding="utf-8") + raise RuntimeError("before commit") + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[official], + body=body, + post_commit_hooks=[lambda: post_commit_calls.append("ran")], + ) + + with kb_ingest_lock(kb_dir / ".openkb"): + assert run_add_mutation(kb_dir, plan) is False + assert not official.exists() + assert post_commit_calls == [] + + def test_coordinator_reports_failed_mutation(self, tmp_path, capsys): + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + from openkb.locks import kb_ingest_lock + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + + def body(): + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# changed\n", encoding="utf-8") + raise RuntimeError("boom") + + plan = AddMutationPlan( + operation="add", + details={"name": "doc.md", "doc_name": "doc"}, + touched_paths=[official], + body=body, + ) + + with kb_ingest_lock(kb_dir / ".openkb"): + assert run_add_mutation(kb_dir, plan) is False + + output = capsys.readouterr().out + assert "[ERROR] add failed for doc.md: boom" in output + + def test_coordinator_post_commit_failure_does_not_roll_back(self, tmp_path): + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + from openkb.locks import kb_ingest_lock + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + + def body(): + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# committed\n", encoding="utf-8") + + def bad_hook(): + raise RuntimeError("hook failed") + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[official], + body=body, + post_commit_hooks=[bad_hook], + ) + + with kb_ingest_lock(kb_dir / ".openkb"): + assert run_add_mutation(kb_dir, plan) is True + assert official.read_text(encoding="utf-8") == "# committed\n" + assert list((kb_dir / ".openkb" / "journal").glob("*.json")) == [] + + def test_coordinator_keyboard_interrupt_rolls_back_and_reraises(self, tmp_path): + import pytest + + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + from openkb.locks import kb_ingest_lock + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + post_commit_calls = [] + + def body(): + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# interrupted\n", encoding="utf-8") + raise KeyboardInterrupt() + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[official], + body=body, + post_commit_hooks=[lambda: post_commit_calls.append("ran")], + ) + + with kb_ingest_lock(kb_dir / ".openkb"): + with pytest.raises(KeyboardInterrupt): + run_add_mutation(kb_dir, plan) + + assert not official.exists() + assert post_commit_calls == [] + assert list((kb_dir / ".openkb" / "journal").glob("*.json")) == [] + + def test_run_add_mutation_requires_lock_held(self, tmp_path): + import pytest + + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + + kb_dir = self._setup_kb(tmp_path) + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[kb_dir / "wiki" / "sources" / "doc.md"], + body=lambda: None, + ) + + with pytest.raises(RuntimeError, match="requires the caller to hold kb_ingest_lock"): + run_add_mutation(kb_dir, plan) + + def test_run_add_mutation_raises_dirty_rollback_when_rollback_fails(self, tmp_path): + import pytest + + from openkb.add_coordinator import AddMutationPlan, DirtyRollbackError, run_add_mutation + from openkb.locks import kb_ingest_lock + from openkb.mutation import MutationSnapshot + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# before\n", encoding="utf-8") + + def body(): + official.write_text("# after\n", encoding="utf-8") + raise RuntimeError("body fails after partial apply") + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[official], + body=body, + ) + + # Force rollback to fail: rollback_best_effort returns the error instead + # of None, so the active journal is retained for next-run recovery. + with kb_ingest_lock(kb_dir / ".openkb"): + with patch.object(MutationSnapshot, "rollback_best_effort", return_value=OSError("disk full")): + with pytest.raises(DirtyRollbackError) as exc_info: + run_add_mutation(kb_dir, plan) + + assert exc_info.value.operation == "add" + # The journal is retained on disk for next-run recovery. + assert exc_info.value.journal_path.exists() + assert exc_info.value.journal_path.is_file() From 9a87b81b63bcc8b1654f2289a64917e98f29b3e3 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Wed, 1 Jul 2026 16:23:26 +0800 Subject: [PATCH 03/10] fix(cloud-import): resolve final doc name under ingest lock --- openkb/cli.py | 130 +++++++++++++++++++++----------------- tests/test_add_command.py | 46 ++++++++++++++ 2 files changed, 119 insertions(+), 57 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index 6f336694..f6b45bf4 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -48,8 +48,17 @@ def filter(self, record: logging.LogRecord) -> bool: resolve_extra_headers, set_extra_headers, resolve_timeout, set_timeout, resolve_litellm_settings, ) -from openkb.converter import _registry_path, _sanitize_stem, convert_document -from openkb.indexer import _write_long_doc_artifacts, prepare_cloud_import +from openkb.converter import ( + _registry_path, + _sanitize_stem, + convert_document, + resolve_doc_name_from_key, +) +from openkb.indexer import ( + _cloud_display_stem, + _write_long_doc_artifacts, + prepare_cloud_import, +) from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock from openkb.log import append_log from openkb.mutation import publish_staged_tree @@ -605,10 +614,11 @@ def import_from_pageindex_cloud( path_key = f"pageindex-cloud:{doc_id}" synthetic_hash = hashlib.sha256(path_key.encode("utf-8")).hexdigest() - registry = HashRegistry(openkb_dir / "hashes.json") - if registry.is_known(synthetic_hash): - click.echo(f" [SKIP] Already imported from PageIndex Cloud: {doc_id}") - return "skipped" + with kb_ingest_lock(kb_dir / ".openkb"): + registry = HashRegistry(openkb_dir / "hashes.json") + if registry.is_known(synthetic_hash): + click.echo(f" [SKIP] Already imported from PageIndex Cloud: {doc_id}") + return "skipped" click.echo(f"Importing from PageIndex Cloud: {doc_id}") doc_name = "" @@ -622,62 +632,68 @@ def import_from_pageindex_cloud( logger.debug("Cloud import traceback:", exc_info=True) return "failed" - doc_name = cloud.doc_name + with kb_ingest_lock(kb_dir / ".openkb"): + registry = HashRegistry(openkb_dir / "hashes.json") + if registry.is_known(synthetic_hash): + click.echo(f" [SKIP] Already imported from PageIndex Cloud: {doc_id}") + return "skipped" - def commit_body(_snapshot) -> None: - summary_path = _write_long_doc_artifacts( - cloud.tree, - cloud.all_pages, - doc_name, - doc_id, - kb_dir, - description=cloud.description, - ) - _run_compile_with_retry( - lambda: compile_long_doc( + stem = _cloud_display_stem(cloud.cloud_name, doc_id) + doc_name = resolve_doc_name_from_key(stem, path_key, registry) + + def commit_body(_snapshot) -> None: + summary_path = _write_long_doc_artifacts( + cloud.tree, + cloud.all_pages, doc_name, - summary_path, doc_id, kb_dir, - model, - doc_description=cloud.description, - ), - label=f"Compiling imported doc (doc_id={doc_id})", - ) + description=cloud.description, + ) + _run_compile_with_retry( + lambda: compile_long_doc( + doc_name, + summary_path, + doc_id, + kb_dir, + model, + doc_description=cloud.description, + ), + label=f"Compiling imported doc (doc_id={doc_id})", + ) - # Register the raw-less cloud entry only after successful compilation. - registry = HashRegistry(openkb_dir / "hashes.json") - meta = { - "name": cloud.cloud_name, - "doc_name": doc_name, - "type": "pageindex_cloud", - "origin": "cloud", - "path": path_key, - "source_path": _registry_path( - kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir - ), - "doc_id": doc_id, - } - registry.remove_by_doc_name(doc_name) - registry.add(synthetic_hash, meta) - - def append_cloud_log() -> None: - append_log(kb_dir / "wiki", "ingest", doc_name) - - plan = AddMutationPlan( - operation="cloud_import", - details={"doc_id": doc_id, "doc_name": doc_name}, - touched_paths=_snapshot_add_paths(kb_dir, doc_name, None, None), - body=commit_body, - post_commit_hooks=[append_cloud_log], - # Cloud import reads from PageIndex Cloud and writes no local blob, - # so .openkb/files is never touched — nothing to snapshot there. - hardlink_dirs={ - kb_dir / "wiki" / "concepts", - kb_dir / "wiki" / "entities", - }, - ) - with kb_ingest_lock(kb_dir / ".openkb"): + # Register the raw-less cloud entry only after successful compilation. + registry = HashRegistry(openkb_dir / "hashes.json") + meta = { + "name": cloud.cloud_name, + "doc_name": doc_name, + "type": "pageindex_cloud", + "origin": "cloud", + "path": path_key, + "source_path": _registry_path( + kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir + ), + "doc_id": doc_id, + } + registry.remove_by_doc_name(doc_name) + registry.add(synthetic_hash, meta) + + def append_cloud_log() -> None: + append_log(kb_dir / "wiki", "ingest", doc_name) + + plan = AddMutationPlan( + operation="cloud_import", + details={"doc_id": doc_id, "doc_name": doc_name}, + touched_paths=_snapshot_add_paths(kb_dir, doc_name, None, None), + body=commit_body, + post_commit_hooks=[append_cloud_log], + # Cloud import reads from PageIndex Cloud and writes no local blob, + # so .openkb/files is never touched — nothing to snapshot there. + hardlink_dirs={ + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + }, + ) if not run_add_mutation(kb_dir, plan): return "failed" except DirtyRollbackError: diff --git a/tests/test_add_command.py b/tests/test_add_command.py index d88bcd55..34a7f1bb 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -479,6 +479,52 @@ def test_cloud_import_uses_add_mutation_coordinator(self, tmp_path): assert plan.details == {"doc_id": "cloud-1", "doc_name": "Cloud-Paper"} assert kb_dir / ".openkb" / "hashes.json" in plan.touched_paths + def test_cloud_import_rechecks_doc_name_under_lock_after_prepare_race(self, tmp_path): + import hashlib + from contextlib import contextmanager + + from openkb.cli import import_from_pageindex_cloud + from openkb.locks import kb_ingest_lock as real_kb_ingest_lock + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + existing_hash = "0" * 64 + injected = {"done": False} + + @contextmanager + def race_before_lock(openkb_dir): + if not injected["done"]: + registry = HashRegistry(openkb_dir / "hashes.json") + registry.add( + existing_hash, + { + "name": "Other Cloud Paper.pdf", + "doc_name": "Cloud-Paper", + "type": "pageindex_cloud", + "origin": "cloud", + "path": "pageindex-cloud:other-cloud", + "source_path": "wiki/sources/Cloud-Paper.json", + "doc_id": "other-cloud", + }, + ) + injected["done"] = True + with real_kb_ingest_lock(openkb_dir): + yield + + with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ + patch("openkb.cli.kb_ingest_lock", side_effect=race_before_lock), \ + patch("openkb.cli.compile_long_doc", return_value=None), \ + patch("openkb.cli._setup_llm_key"): + assert import_from_pageindex_cloud("cloud-1", kb_dir) == "added" + + registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") + synthetic = hashlib.sha256(b"pageindex-cloud:cloud-1").hexdigest() + expected_doc_name = f"Cloud-Paper-{synthetic[:8]}" + assert registry.get(existing_hash)["doc_name"] == "Cloud-Paper" + assert registry.get(synthetic)["doc_name"] == expected_doc_name + assert (kb_dir / "wiki" / "sources" / f"{expected_doc_name}.json").exists() + def test_cloud_import_propagates_dirty_rollback(self, tmp_path): import pytest From 619e78e20487ce2654ae42e4ef2cb77ddeeb520e Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Wed, 1 Jul 2026 16:43:43 +0800 Subject: [PATCH 04/10] test(add): cover coordinator conflict edge cases --- tests/test_add_command.py | 71 +++++++++++++++++++++++++++++++++++++++ tests/test_locks.py | 24 +++++++++++++ 2 files changed, 95 insertions(+) diff --git a/tests/test_add_command.py b/tests/test_add_command.py index 34a7f1bb..656741f7 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -221,6 +221,31 @@ def test_add_directory_calls_helper_for_each_file(self, tmp_path): assert "b.txt" in called_names assert "ignore.xyz" not in called_names + def test_add_directory_stops_after_dirty_rollback(self, tmp_path): + import pytest + + from openkb.add_coordinator import DirtyRollbackError + + kb_dir = self._setup_kb(tmp_path) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + (docs_dir / "a.md").write_text("# A") + (docs_dir / "b.md").write_text("# B") + dirty_error = DirtyRollbackError( + "add", + kb_dir / ".openkb" / "journal" / "retained.json", + ) + + runner = CliRunner() + with patch("openkb.cli.add_single_file", side_effect=dirty_error) as mock_add, \ + patch("openkb.cli._find_kb_dir", return_value=kb_dir): + with pytest.raises(DirtyRollbackError) as exc_info: + runner.invoke(cli, ["add", str(docs_dir)], catch_exceptions=False) + + assert exc_info.value is dirty_error + mock_add.assert_called_once() + assert mock_add.call_args.args[0].name == "a.md" + def test_add_unsupported_extension(self, tmp_path): kb_dir = self._setup_kb(tmp_path) doc = tmp_path / "file.xyz" @@ -525,6 +550,52 @@ def race_before_lock(openkb_dir): assert registry.get(synthetic)["doc_name"] == expected_doc_name assert (kb_dir / "wiki" / "sources" / f"{expected_doc_name}.json").exists() + def test_cloud_import_skips_if_same_doc_id_registered_after_fetch(self, tmp_path): + import hashlib + from contextlib import contextmanager + + from openkb.cli import import_from_pageindex_cloud + from openkb.locks import kb_ingest_lock as real_kb_ingest_lock + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + path_key = "pageindex-cloud:cloud-1" + synthetic = hashlib.sha256(path_key.encode("utf-8")).hexdigest() + lock_calls = {"count": 0} + + @contextmanager + def register_same_doc_before_second_lock(openkb_dir): + lock_calls["count"] += 1 + if lock_calls["count"] == 2: + HashRegistry(openkb_dir / "hashes.json").add( + synthetic, + { + "name": "Cloud Paper.pdf", + "doc_name": "Cloud-Paper", + "type": "pageindex_cloud", + "origin": "cloud", + "path": path_key, + "source_path": "wiki/sources/Cloud-Paper.json", + "doc_id": "cloud-1", + }, + ) + with real_kb_ingest_lock(openkb_dir): + yield + + with patch("openkb.cli.prepare_cloud_import", return_value=cloud) as mock_prepare, \ + patch("openkb.cli.kb_ingest_lock", side_effect=register_same_doc_before_second_lock), \ + patch("openkb.cli.compile_long_doc") as mock_compile, \ + patch("openkb.add_coordinator.run_add_mutation") as mock_run, \ + patch("openkb.cli._setup_llm_key"): + assert import_from_pageindex_cloud("cloud-1", kb_dir) == "skipped" + + assert lock_calls["count"] == 2 + mock_prepare.assert_called_once() + mock_compile.assert_not_called() + mock_run.assert_not_called() + assert HashRegistry(kb_dir / ".openkb" / "hashes.json").is_known(synthetic) + def test_cloud_import_propagates_dirty_rollback(self, tmp_path): import pytest diff --git a/tests/test_locks.py b/tests/test_locks.py index 5da6936c..d0274443 100644 --- a/tests/test_locks.py +++ b/tests/test_locks.py @@ -11,6 +11,7 @@ atomic_write_json, atomic_write_text, kb_ingest_lock, + kb_ingest_lock_held, kb_read_lock, ) @@ -76,6 +77,29 @@ def test_write_lock_can_take_nested_read(tmp_path): assert (openkb_dir / "ingest.lock").exists() +def test_kb_ingest_lock_held_is_exclusive_and_thread_local(tmp_path): + openkb_dir = tmp_path / ".openkb" + worker_seen = [] + + assert not kb_ingest_lock_held(openkb_dir) + + with kb_read_lock(openkb_dir): + assert not kb_ingest_lock_held(openkb_dir) + + with kb_ingest_lock(openkb_dir): + assert kb_ingest_lock_held(openkb_dir) + + worker = threading.Thread( + target=lambda: worker_seen.append(kb_ingest_lock_held(openkb_dir)) + ) + worker.start() + worker.join(timeout=2) + + assert not worker.is_alive() + assert worker_seen == [False] + assert not kb_ingest_lock_held(openkb_dir) + + def test_atomic_write_text_replaces_file(tmp_path): target = tmp_path / "nested" / "file.txt" atomic_write_text(target, "first") From b5ebf520065ad60993aee9537fa02ec666aad686 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Wed, 1 Jul 2026 18:05:59 +0800 Subject: [PATCH 05/10] fix(indexer): roll back the PageIndex blob when indexing fails after col.add Since the add mutation stopped eagerly snapshotting .openkb/files (it registers the new blob via track_new only on success), a blob written by col.add() leaks if index_long_document raises afterward: pageindex.db is rolled back by the snapshot but the blob file is not, and nothing reclaims it. Delete the doc on the failure path so the indexer owns cleanup of a half-applied add, restoring the .openkb/files rollback-surface guarantee. --- openkb/indexer.py | 103 +++++++++++++++++++++++++----------------- tests/test_indexer.py | 23 ++++++++++ 2 files changed, 84 insertions(+), 42 deletions(-) diff --git a/openkb/indexer.py b/openkb/indexer.py index fd50a38c..e57843b0 100644 --- a/openkb/indexer.py +++ b/openkb/indexer.py @@ -188,49 +188,68 @@ def index_long_document( if attempt == max_retries: raise RuntimeError(f"Failed to index {pdf_path.name} after {max_retries} attempts: {exc}") from exc - # Fetch complete document (metadata + structure + text) - doc = col.get_document(doc_id, include_text=True) - indexed_doc_name: str = doc.get("doc_name", pdf_path.stem) - description: str = doc.get("doc_description", "") - structure: list = doc.get("structure", []) - - # Debug: print doc keys and page_count to diagnose get_page_content range - logger.info("Doc keys: %s", list(doc.keys())) - logger.info("page_count from doc: %s", doc.get("page_count", "NOT PRESENT")) - - tree = { - "doc_name": indexed_doc_name, - "doc_description": description, - "structure": structure, - } - - # Write wiki/sources/ — per-page content - sources_dir = kb_dir / "wiki" / "sources" - sources_dir.mkdir(parents=True, exist_ok=True) - images_dir = sources_dir / "images" / source_name - - all_pages: list[dict[str, Any]] = [] - if pageindex_api_key: - # Cloud mode: fetch OCR'd markdown from PageIndex. get_page_content - # requires a page range, so pass "1-N". - page_count = _get_pdf_page_count(pdf_path) - try: - all_pages = _normalize_page_content(col.get_page_content(doc_id, f"1-{page_count}")) - except Exception as exc: - logger.warning("Cloud get_page_content failed for %s: %s", pdf_path.name, exc) - - if not all_pages: + # The PageIndex blob for doc_id is now durably on disk. The add mutation no + # longer eagerly snapshots .openkb/files — it registers the new blob via + # snapshot.track_new() only on a successful return — so if any step below + # fails, delete the document we just added. Otherwise the blob leaks as an + # orphan that pageindex.db (rolled back by the snapshot) no longer refs and + # no reaper reclaims. + try: + # Fetch complete document (metadata + structure + text) + doc = col.get_document(doc_id, include_text=True) + indexed_doc_name: str = doc.get("doc_name", pdf_path.stem) + description: str = doc.get("doc_description", "") + structure: list = doc.get("structure", []) + + # Debug: print doc keys and page_count to diagnose get_page_content range + logger.info("Doc keys: %s", list(doc.keys())) + logger.info("page_count from doc: %s", doc.get("page_count", "NOT PRESENT")) + + tree = { + "doc_name": indexed_doc_name, + "doc_description": description, + "structure": structure, + } + + # Write wiki/sources/ — per-page content + sources_dir = kb_dir / "wiki" / "sources" + sources_dir.mkdir(parents=True, exist_ok=True) + images_dir = sources_dir / "images" / source_name + + all_pages: list[dict[str, Any]] = [] if pageindex_api_key: - logger.warning("Cloud returned no pages for %s; falling back to local pymupdf", pdf_path.name) - all_pages = _normalize_page_content(_convert_pdf_to_pages(pdf_path, source_name, images_dir)) - - if not all_pages: - raise RuntimeError(f"No page content extracted for {pdf_path.name}") - - _write_long_doc_artifacts( - tree, all_pages, source_name, doc_id, kb_dir, description=description - ) - return IndexResult(doc_id=doc_id, description=description, tree=tree) + # Cloud mode: fetch OCR'd markdown from PageIndex. get_page_content + # requires a page range, so pass "1-N". + page_count = _get_pdf_page_count(pdf_path) + try: + all_pages = _normalize_page_content(col.get_page_content(doc_id, f"1-{page_count}")) + except Exception as exc: + logger.warning("Cloud get_page_content failed for %s: %s", pdf_path.name, exc) + + if not all_pages: + if pageindex_api_key: + logger.warning("Cloud returned no pages for %s; falling back to local pymupdf", pdf_path.name) + all_pages = _normalize_page_content(_convert_pdf_to_pages(pdf_path, source_name, images_dir)) + + if not all_pages: + raise RuntimeError(f"No page content extracted for {pdf_path.name}") + + _write_long_doc_artifacts( + tree, all_pages, source_name, doc_id, kb_dir, description=description + ) + return IndexResult(doc_id=doc_id, description=description, tree=tree) + except BaseException: + # Best-effort: remove the blob this add created. A failure here (e.g. a + # second interrupt) only means the blob may stay orphaned — the original + # error still propagates so the caller (mutation coordinator) rolls back + # everything else it snapshotted. + try: + col.delete_document(doc_id) + except Exception: + logger.warning( + "PageIndex cleanup of %s failed after error; blob may be orphaned", doc_id + ) + raise # PageIndex's get_page_content rejects a single page range covering more than diff --git a/tests/test_indexer.py b/tests/test_indexer.py index d4a533ba..ec4ea92b 100644 --- a/tests/test_indexer.py +++ b/tests/test_indexer.py @@ -89,6 +89,29 @@ def test_returns_index_result(self, kb_dir, sample_tree, tmp_path): assert result.description == sample_tree["doc_description"] assert result.tree is not None + def test_deletes_pageindex_doc_when_a_post_add_step_fails(self, kb_dir, sample_tree, tmp_path): + """The PageIndex blob is durably written by col.add(), but .openkb/files is + no longer in the add mutation's eager snapshot — track_new only registers + the blob on a successful return. So if any step after col.add() raises + (here: get_document), index_long_document must delete the doc it just + added; otherwise the blob leaks as an orphan that pageindex.db — rolled + back by the snapshot — no longer references, and no reaper reclaims.""" + doc_id = "abc-123" + col = self._make_fake_collection(doc_id, sample_tree) + col.get_document.side_effect = RuntimeError("get_document blew up") + + fake_client = MagicMock() + fake_client.collection.return_value = col + + pdf_path = tmp_path / "sample.pdf" + pdf_path.write_bytes(b"%PDF-1.4 fake") + + with patch("openkb.indexer.PageIndexClient", return_value=fake_client): + with pytest.raises(RuntimeError, match="get_document blew up"): + index_long_document(pdf_path, kb_dir) + + col.delete_document.assert_called_once_with(doc_id) + def test_source_page_written_as_json(self, kb_dir, sample_tree, tmp_path): """Long doc source should be written as JSON, not markdown.""" import json as json_mod From cf53d5f85cd8d6d04b4965669560d75901eeec97 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Wed, 1 Jul 2026 18:06:06 +0800 Subject: [PATCH 06/10] fix(cloud-import): surface real error instead of mislabeling snapshot prep run_add_mutation handles snapshot/body failures itself and returns False, so the broad except only ever catches pre-mutation errors (name resolution, registry read, plan construction). Echo the real exception instead of the old 'Failed to prepare mutation snapshot' label, which hid the cause at DEBUG level. --- openkb/cli.py | 7 +++++-- tests/test_add_command.py | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index f6b45bf4..7d82fbc6 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -698,8 +698,11 @@ def append_cloud_log() -> None: return "failed" except DirtyRollbackError: raise - except Exception: - click.echo(f" [ERROR] Failed to prepare mutation snapshot for cloud import {doc_id}.") + except Exception as exc: + # run_add_mutation handles snapshot/body failures itself (returns False), + # so this except only catches pre-mutation errors — surface the real cause + # instead of the old misleading "Failed to prepare mutation snapshot" label. + click.echo(f" [ERROR] Cloud import failed for {doc_id}: {exc}") logger.debug("Cloud import mutation traceback:", exc_info=True) return "failed" diff --git a/tests/test_add_command.py b/tests/test_add_command.py index 656741f7..cf3d256d 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -617,6 +617,27 @@ def test_cloud_import_propagates_dirty_rollback(self, tmp_path): assert exc_info.value is dirty_error + def test_cloud_import_failure_message_names_real_cause(self, tmp_path, capsys): + """A non-body failure caught at the outer except (here: name resolution) + must surface the real error, not be mislabeled as 'Failed to prepare + mutation snapshot'. run_add_mutation handles snapshot/body failures + itself and returns False, so the broad except only ever catches + pre-mutation errors that the old label misdescribed.""" + from openkb.cli import import_from_pageindex_cloud + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + + with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ + patch("openkb.cli.resolve_doc_name_from_key", + side_effect=RuntimeError("name resolution blew up")), \ + patch("openkb.cli._setup_llm_key"): + assert import_from_pageindex_cloud("cloud-1", kb_dir) == "failed" + + out = capsys.readouterr().out + assert "Failed to prepare mutation snapshot" not in out + assert "name resolution blew up" in out + def test_cloud_import_keyboard_interrupt_rolls_back_artifacts(self, tmp_path): import pytest From d9ddc29cbfe044f90329f845adc16353120f93d7 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Thu, 2 Jul 2026 18:08:58 +0800 Subject: [PATCH 07/10] style: wrap long lines for ruff E501 (CI gate from #159) --- openkb/indexer.py | 8 ++++++-- tests/test_add_command.py | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/openkb/indexer.py b/openkb/indexer.py index fc6c3283..c8272200 100644 --- a/openkb/indexer.py +++ b/openkb/indexer.py @@ -243,8 +243,12 @@ def index_long_document(pdf_path: Path, kb_dir: Path, doc_name: str | None = Non if not all_pages: if pageindex_api_key: - logger.warning("Cloud returned no pages for %s; falling back to local pymupdf", pdf_path.name) - all_pages = _normalize_page_content(_convert_pdf_to_pages(pdf_path, source_name, images_dir)) + logger.warning( + "Cloud returned no pages for %s; falling back to local pymupdf", pdf_path.name + ) + all_pages = _normalize_page_content( + _convert_pdf_to_pages(pdf_path, source_name, images_dir) + ) if not all_pages: raise RuntimeError(f"No page content extracted for {pdf_path.name}") diff --git a/tests/test_add_command.py b/tests/test_add_command.py index 15c1602b..9fcf7900 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -884,7 +884,9 @@ def body(_snapshot): # Force rollback to fail: rollback_best_effort returns the error instead # of None, so the active journal is retained for next-run recovery. with kb_ingest_lock(kb_dir / ".openkb"): - with patch.object(MutationSnapshot, "rollback_best_effort", return_value=OSError("disk full")): + with patch.object( + MutationSnapshot, "rollback_best_effort", return_value=OSError("disk full") + ): with pytest.raises(DirtyRollbackError) as exc_info: run_add_mutation(kb_dir, plan) From bdb686dea316c66251eb2856a74f3fc424b8aecc Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Thu, 2 Jul 2026 18:15:53 +0800 Subject: [PATCH 08/10] style: apply ruff format to add_coordinator.py and test_add_command.py --- openkb/add_coordinator.py | 8 ++--- tests/test_add_command.py | 76 ++++++++++++++++++++++++--------------- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/openkb/add_coordinator.py b/openkb/add_coordinator.py index e713a85b..8ddf1245 100644 --- a/openkb/add_coordinator.py +++ b/openkb/add_coordinator.py @@ -88,9 +88,7 @@ def _failure_target(details: dict) -> str: def run_add_mutation(kb_dir: Path, plan: AddMutationPlan) -> bool: if not kb_ingest_lock_held(kb_dir / ".openkb"): - raise RuntimeError( - "run_add_mutation requires the caller to hold kb_ingest_lock" - ) + raise RuntimeError("run_add_mutation requires the caller to hold kb_ingest_lock") snapshot = None try: snapshot = snapshot_paths( @@ -109,9 +107,7 @@ def run_add_mutation(kb_dir: Path, plan: AddMutationPlan) -> bool: # than committing more docs on top of dirty state that the next # recovery would roll back over. raise DirtyRollbackError(plan.operation, dirty_journal) - click.echo( - f" [ERROR] {plan.operation} failed{_failure_target(plan.details)}: {exc}" - ) + click.echo(f" [ERROR] {plan.operation} failed{_failure_target(plan.details)}: {exc}") logger.debug("%s mutation failed:", plan.operation, exc_info=True) return False except BaseException: diff --git a/tests/test_add_command.py b/tests/test_add_command.py index 9fcf7900..ae84461e 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -114,11 +114,13 @@ def test_add_single_file_uses_add_mutation_coordinator(self, tmp_path): doc_name="coordinated", ) - with patch("openkb.cli.convert_document", return_value=result), \ - patch("openkb.cli.publish_staged_tree"), \ - patch("openkb.cli.asyncio.run"), \ - patch("openkb.cli._setup_llm_key"), \ - patch("openkb.add_coordinator.run_add_mutation", return_value=True) as mock_run: + with ( + patch("openkb.cli.convert_document", return_value=result), + patch("openkb.cli.publish_staged_tree"), + patch("openkb.cli.asyncio.run"), + patch("openkb.cli._setup_llm_key"), + patch("openkb.add_coordinator.run_add_mutation", return_value=True) as mock_run, + ): assert add_single_file(doc, kb_dir) == "added" plan = mock_run.call_args.args[1] @@ -248,8 +250,10 @@ def test_add_directory_stops_after_dirty_rollback(self, tmp_path): ) runner = CliRunner() - with patch("openkb.cli.add_single_file", side_effect=dirty_error) as mock_add, \ - patch("openkb.cli._find_kb_dir", return_value=kb_dir): + with ( + patch("openkb.cli.add_single_file", side_effect=dirty_error) as mock_add, + patch("openkb.cli._find_kb_dir", return_value=kb_dir), + ): with pytest.raises(DirtyRollbackError) as exc_info: runner.invoke(cli, ["add", str(docs_dir)], catch_exceptions=False) @@ -524,9 +528,11 @@ def test_cloud_import_uses_add_mutation_coordinator(self, tmp_path): kb_dir = self._setup_kb(tmp_path) cloud = self._cloud_data(doc_name="Cloud-Paper") - with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ - patch("openkb.add_coordinator.run_add_mutation", return_value=True) as mock_run, \ - patch("openkb.cli._setup_llm_key"): + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch("openkb.add_coordinator.run_add_mutation", return_value=True) as mock_run, + patch("openkb.cli._setup_llm_key"), + ): assert import_from_pageindex_cloud("cloud-1", kb_dir) == "added" plan = mock_run.call_args.args[1] @@ -567,10 +573,12 @@ def race_before_lock(openkb_dir): with real_kb_ingest_lock(openkb_dir): yield - with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ - patch("openkb.cli.kb_ingest_lock", side_effect=race_before_lock), \ - patch("openkb.cli.compile_long_doc", return_value=None), \ - patch("openkb.cli._setup_llm_key"): + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch("openkb.cli.kb_ingest_lock", side_effect=race_before_lock), + patch("openkb.cli.compile_long_doc", return_value=None), + patch("openkb.cli._setup_llm_key"), + ): assert import_from_pageindex_cloud("cloud-1", kb_dir) == "added" registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") @@ -613,11 +621,13 @@ def register_same_doc_before_second_lock(openkb_dir): with real_kb_ingest_lock(openkb_dir): yield - with patch("openkb.cli.prepare_cloud_import", return_value=cloud) as mock_prepare, \ - patch("openkb.cli.kb_ingest_lock", side_effect=register_same_doc_before_second_lock), \ - patch("openkb.cli.compile_long_doc") as mock_compile, \ - patch("openkb.add_coordinator.run_add_mutation") as mock_run, \ - patch("openkb.cli._setup_llm_key"): + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud) as mock_prepare, + patch("openkb.cli.kb_ingest_lock", side_effect=register_same_doc_before_second_lock), + patch("openkb.cli.compile_long_doc") as mock_compile, + patch("openkb.add_coordinator.run_add_mutation") as mock_run, + patch("openkb.cli._setup_llm_key"), + ): assert import_from_pageindex_cloud("cloud-1", kb_dir) == "skipped" assert lock_calls["count"] == 2 @@ -639,9 +649,11 @@ def test_cloud_import_propagates_dirty_rollback(self, tmp_path): kb_dir / ".openkb" / "journal" / "retained.json", ) - with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ - patch("openkb.add_coordinator.run_add_mutation", side_effect=dirty_error), \ - patch("openkb.cli._setup_llm_key"): + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch("openkb.add_coordinator.run_add_mutation", side_effect=dirty_error), + patch("openkb.cli._setup_llm_key"), + ): with pytest.raises(DirtyRollbackError) as exc_info: import_from_pageindex_cloud("cloud-1", kb_dir) @@ -658,10 +670,14 @@ def test_cloud_import_failure_message_names_real_cause(self, tmp_path, capsys): kb_dir = self._setup_kb(tmp_path) cloud = self._cloud_data(doc_name="Cloud-Paper") - with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ - patch("openkb.cli.resolve_doc_name_from_key", - side_effect=RuntimeError("name resolution blew up")), \ - patch("openkb.cli._setup_llm_key"): + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch( + "openkb.cli.resolve_doc_name_from_key", + side_effect=RuntimeError("name resolution blew up"), + ), + patch("openkb.cli._setup_llm_key"), + ): assert import_from_pageindex_cloud("cloud-1", kb_dir) == "failed" out = capsys.readouterr().out @@ -678,9 +694,11 @@ def test_cloud_import_keyboard_interrupt_rolls_back_artifacts(self, tmp_path): doc_name = "Cloud-Paper" cloud = self._cloud_data(doc_name=doc_name) - with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ - patch("openkb.cli.compile_long_doc", side_effect=KeyboardInterrupt()), \ - patch("openkb.cli._setup_llm_key"): + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch("openkb.cli.compile_long_doc", side_effect=KeyboardInterrupt()), + patch("openkb.cli._setup_llm_key"), + ): with pytest.raises(KeyboardInterrupt): import_from_pageindex_cloud("cloud-1", kb_dir) From ac8004053a1123486cf36b9ff949aa84800fa6f0 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Thu, 2 Jul 2026 18:21:19 +0800 Subject: [PATCH 09/10] refactor(cli): drop duplicate _cleanup_staging, reuse coordinator helper cli._cleanup_staging duplicated add_coordinator._cleanup_staging_dirs (both guard None then shutil.rmtree with ignore_errors). Drop the cli copy and route its two pre-coordinator call sites through the coordinator helper. --- openkb/cli.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index 8ac60179..f39d0b44 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -60,6 +60,7 @@ def filter(self, record: logging.LogRecord) -> bool: set_timeout, resolve_litellm_settings, ) +from openkb.add_coordinator import _cleanup_staging_dirs from openkb.converter import ( _registry_path, _sanitize_stem, @@ -384,11 +385,6 @@ def _staging_dir_for(kb_dir: Path, file_path: Path) -> Path: return path -def _cleanup_staging(path: Path | None) -> None: - if path is not None: - shutil.rmtree(path, ignore_errors=True) - - def _final_artifact_paths(result, kb_dir: Path) -> tuple[Path | None, Path | None]: final_raw = None final_source = None @@ -492,12 +488,12 @@ def _add_single_file_locked( except Exception as exc: click.echo(f" [ERROR] Conversion failed: {exc}") logger.debug("Conversion traceback:", exc_info=True) - _cleanup_staging(staging_dir) + _cleanup_staging_dirs([staging_dir]) return "failed" if result.skipped: click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") - _cleanup_staging(staging_dir) + _cleanup_staging_dirs([staging_dir]) return "skipped" doc_name = result.doc_name or file_path.stem From 08523dea7b0222de5ce559baae1f1209c0cb351a Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Thu, 2 Jul 2026 18:21:29 +0800 Subject: [PATCH 10/10] refactor(add_coordinator): drop unused rollback_error_message field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AddMutationPlan.rollback_error_message had zero callers — every construction site (cli.py x2, tests x6) used the default. Remove the field and inline the literal at its single read site. --- openkb/add_coordinator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/openkb/add_coordinator.py b/openkb/add_coordinator.py index 8ddf1245..ec484da5 100644 --- a/openkb/add_coordinator.py +++ b/openkb/add_coordinator.py @@ -48,7 +48,6 @@ class AddMutationPlan: post_commit_hooks: Sequence[PostCommitHook] = field(default_factory=tuple) hardlink_dirs: set[Path] = field(default_factory=set) staging_dirs: Sequence[Path | None] = field(default_factory=tuple) - rollback_error_message: str = "Rollback failed; mutation journal retained for recovery" def _cleanup_staging_dirs(staging_dirs: Sequence[Path | None]) -> None: @@ -73,7 +72,10 @@ def _rollback_snapshot(plan: AddMutationPlan, snapshot) -> Path | None: if rollback_error is None: snapshot.discard_best_effort() else: - click.echo(f" [ERROR] {plan.rollback_error_message}: {snapshot.journal_path}") + click.echo( + " [ERROR] Rollback failed; mutation journal retained for recovery: " + f"{snapshot.journal_path}" + ) _cleanup_staging_dirs(plan.staging_dirs) return snapshot.journal_path if rollback_error is not None else None