diff --git a/openkb/add_coordinator.py b/openkb/add_coordinator.py new file mode 100644 index 00000000..ec484da5 --- /dev/null +++ b/openkb/add_coordinator.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +import logging +import shutil +from collections.abc import Callable, Sequence +from dataclasses import dataclass, field +from pathlib import Path + +import click + +from openkb.locks import kb_ingest_lock_held +from openkb.mutation import MutationSnapshot, snapshot_paths + +logger = logging.getLogger(__name__) + +MutationBody = Callable[[MutationSnapshot], None] +PostCommitHook = Callable[[], None] + + +class DirtyRollbackError(RuntimeError): + """A mutation's rollback failed, leaving an active journal on disk. + + The KB may be in a partially-applied state that the retained journal will + attempt to roll back on the next exclusive-lock acquisition. Batch owners + (the parallel/serial ``add`` loops) MUST stop committing further mutations + on top of this dirty state instead of continuing — otherwise the next + recovery rolls this journal back over the shared paths it recorded + (``hashes.json``, ``index.md``, ``concepts/``, ``entities/``) and silently + clobbers the later commits. Single-mutation callers should let it propagate + so the command fails loudly; rerunning recovers via the drain. + """ + + def __init__(self, operation: str, journal_path: Path) -> None: + super().__init__( + f"Dirty rollback for {operation}; journal retained at {journal_path}. " + f"Rerun the command to recover." + ) + self.operation = operation + self.journal_path = journal_path + + +@dataclass(slots=True) +class AddMutationPlan: + operation: str + details: dict + touched_paths: Sequence[Path] + body: MutationBody + post_commit_hooks: Sequence[PostCommitHook] = field(default_factory=tuple) + hardlink_dirs: set[Path] = field(default_factory=set) + staging_dirs: Sequence[Path | None] = field(default_factory=tuple) + + +def _cleanup_staging_dirs(staging_dirs: Sequence[Path | None]) -> None: + for staging_dir in staging_dirs: + if staging_dir is not None: + shutil.rmtree(staging_dir, ignore_errors=True) + + +def _rollback_snapshot(plan: AddMutationPlan, snapshot) -> Path | None: + """Best-effort rollback; returns the retained journal path on dirty failure. + + Returns ``snapshot.journal_path`` when the snapshot existed but rollback + FAILED (the active journal is retained for next-run recovery), otherwise + ``None`` — covering both "snapshot is None" (nothing was applied; the + failure happened during snapshot setup before the body ran) and a clean + rollback that discarded its journal. + """ + if snapshot is None: + _cleanup_staging_dirs(plan.staging_dirs) + return None + rollback_error = snapshot.rollback_best_effort() + if rollback_error is None: + snapshot.discard_best_effort() + else: + click.echo( + " [ERROR] Rollback failed; mutation journal retained for recovery: " + f"{snapshot.journal_path}" + ) + _cleanup_staging_dirs(plan.staging_dirs) + return snapshot.journal_path if rollback_error is not None else None + + +def _failure_target(details: dict) -> str: + for key in ("name", "doc_name", "doc_id"): + value = details.get(key) + if value: + return f" for {value}" + return "" + + +def run_add_mutation(kb_dir: Path, plan: AddMutationPlan) -> bool: + if not kb_ingest_lock_held(kb_dir / ".openkb"): + raise RuntimeError("run_add_mutation requires the caller to hold kb_ingest_lock") + snapshot = None + try: + snapshot = snapshot_paths( + kb_dir, + list(plan.touched_paths), + operation=plan.operation, + details=plan.details, + hardlink_dirs=plan.hardlink_dirs, + ) + plan.body(snapshot) + snapshot.mark_committed() + except Exception as exc: + dirty_journal = _rollback_snapshot(plan, snapshot) + if dirty_journal is not None: + # Rollback failed and left an active journal. Stop the batch rather + # than committing more docs on top of dirty state that the next + # recovery would roll back over. + raise DirtyRollbackError(plan.operation, dirty_journal) + click.echo(f" [ERROR] {plan.operation} failed{_failure_target(plan.details)}: {exc}") + logger.debug("%s mutation failed:", plan.operation, exc_info=True) + return False + except BaseException: + # Interrupt (KeyboardInterrupt / SystemExit): best-effort rollback for + # its side-effects only. Do NOT raise DirtyRollbackError — propagate the + # interrupt so the user's abort is honored. Any retained journal or + # orphaned staging is recovered next run by the drain + reaper. + _rollback_snapshot(plan, snapshot) + raise + finally: + _cleanup_staging_dirs(plan.staging_dirs) + + for hook in plan.post_commit_hooks: + try: + hook() + except Exception as exc: + logger.warning("Post-commit hook failed for %s: %s", plan.operation, exc) + + cleanup_error = snapshot.discard_best_effort() + if cleanup_error is not None: + click.echo(f" [WARN] mutation journal cleanup failed: {cleanup_error}") + return True diff --git a/openkb/cli.py b/openkb/cli.py index 2c27eab2..f39d0b44 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -60,11 +60,21 @@ def filter(self, record: logging.LogRecord) -> bool: set_timeout, resolve_litellm_settings, ) -from openkb.converter import _registry_path, _sanitize_stem, convert_document -from openkb.indexer import _write_long_doc_artifacts, prepare_cloud_import +from openkb.add_coordinator import _cleanup_staging_dirs +from openkb.converter import ( + _registry_path, + _sanitize_stem, + convert_document, + resolve_doc_name_from_key, +) +from openkb.indexer import ( + _cloud_display_stem, + _write_long_doc_artifacts, + prepare_cloud_import, +) from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock from openkb.log import append_log -from openkb.mutation import MutationSnapshot, publish_staged_tree, snapshot_paths +from openkb.mutation import publish_staged_tree from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS # Suppress warnings after all imports — markitdown overrides filters at import time @@ -375,11 +385,6 @@ def _staging_dir_for(kb_dir: Path, file_path: Path) -> Path: return path -def _cleanup_staging(path: Path | None) -> None: - if path is not None: - shutil.rmtree(path, ignore_errors=True) - - def _final_artifact_paths(result, kb_dir: Path) -> tuple[Path | None, Path | None]: final_raw = None final_source = None @@ -475,7 +480,6 @@ def _add_single_file_locked( model: str = config.get("model", DEFAULT_CONFIG["model"]) staging_dir = _staging_dir_for(kb_dir, file_path) if stage else None - snapshot: MutationSnapshot | None = None # 2. Convert document into staging when possible. click.echo(f"Adding: {file_path.name}") @@ -484,40 +488,27 @@ def _add_single_file_locked( except Exception as exc: click.echo(f" [ERROR] Conversion failed: {exc}") logger.debug("Conversion traceback:", exc_info=True) - _cleanup_staging(staging_dir) + _cleanup_staging_dirs([staging_dir]) return "failed" if result.skipped: click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") - _cleanup_staging(staging_dir) + _cleanup_staging_dirs([staging_dir]) return "skipped" doc_name = result.doc_name or file_path.stem index_result = None # populated only on the long-doc branch final_raw, final_source = _final_artifact_paths(result, kb_dir) - try: - snapshot = snapshot_paths( - kb_dir, - _snapshot_add_paths(kb_dir, doc_name, final_raw, final_source), - operation="add", - details={ - "file_hash": result.file_hash, - "name": file_path.name, - "doc_name": doc_name, - }, - hardlink_dirs={ - kb_dir / "wiki" / "concepts", - kb_dir / "wiki" / "entities", - }, - ) + + def commit_body(snapshot) -> None: + nonlocal index_result publish_staged_tree(staging_dir, kb_dir) if final_raw is not None: result.raw_path = final_raw if final_source is not None: result.source_path = final_source - # 3/4. Index and compile if result.is_long_doc: if result.raw_path is None: raise RuntimeError(f"Converted long document has no raw artifact: {file_path.name}") @@ -602,34 +593,29 @@ def _add_single_file_locked( registry.remove_by_hash(existing_hash) registry.add(result.file_hash, meta) - snapshot.mark_committed() - except Exception: - if snapshot is None: - click.echo(f" [ERROR] Failed to prepare mutation snapshot for {file_path.name}.") - _cleanup_staging(staging_dir) - return "failed" - rollback_error = snapshot.rollback_best_effort() - if rollback_error is None: - snapshot.discard_best_effort() - else: - click.echo( - " [ERROR] Rollback failed; mutation journal retained for recovery: " - f"{snapshot.journal_path}" - ) - _cleanup_staging(staging_dir) - return "failed" - finally: - _cleanup_staging(staging_dir) - - try: + def append_ingest_log() -> None: append_log(kb_dir / "wiki", "ingest", file_path.name) - except Exception as exc: - logger.warning("Failed to append ingest log for %s: %s", file_path.name, exc) - cleanup_error = snapshot.discard_best_effort() - if cleanup_error is not None: - click.echo( - f" [WARN] {file_path.name} added, but mutation journal cleanup failed: {cleanup_error}" - ) + + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + + plan = AddMutationPlan( + operation="add", + details={ + "file_hash": result.file_hash, + "name": file_path.name, + "doc_name": doc_name, + }, + touched_paths=_snapshot_add_paths(kb_dir, doc_name, final_raw, final_source), + body=commit_body, + post_commit_hooks=[append_ingest_log], + hardlink_dirs={ + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + }, + staging_dirs=[staging_dir], + ) + if not run_add_mutation(kb_dir, plan): + return "failed" click.echo(f" [OK] {file_path.name} added to knowledge base.") return "added" @@ -654,14 +640,16 @@ def import_from_pageindex_cloud(doc_id: str, kb_dir: Path) -> Literal["added", " path_key = f"pageindex-cloud:{doc_id}" synthetic_hash = hashlib.sha256(path_key.encode("utf-8")).hexdigest() - registry = HashRegistry(openkb_dir / "hashes.json") - if registry.is_known(synthetic_hash): - click.echo(f" [SKIP] Already imported from PageIndex Cloud: {doc_id}") - return "skipped" + with kb_ingest_lock(kb_dir / ".openkb"): + registry = HashRegistry(openkb_dir / "hashes.json") + if registry.is_known(synthetic_hash): + click.echo(f" [SKIP] Already imported from PageIndex Cloud: {doc_id}") + return "skipped" click.echo(f"Importing from PageIndex Cloud: {doc_id}") - snapshot: MutationSnapshot | None = None doc_name = "" + from openkb.add_coordinator import AddMutationPlan, DirtyRollbackError, run_add_mutation + try: try: cloud = prepare_cloud_import(doc_id, kb_dir, path_key) @@ -670,76 +658,80 @@ def import_from_pageindex_cloud(doc_id: str, kb_dir: Path) -> Literal["added", " logger.debug("Cloud import traceback:", exc_info=True) return "failed" - doc_name = cloud.doc_name - snapshot = snapshot_paths( - kb_dir, - _snapshot_add_paths(kb_dir, doc_name, None, None), - operation="cloud_import", - details={"doc_id": doc_id, "doc_name": doc_name}, - # Cloud import reads from PageIndex Cloud and writes no local blob, - # so .openkb/files is never touched — nothing to snapshot there. - hardlink_dirs={ - kb_dir / "wiki" / "concepts", - kb_dir / "wiki" / "entities", - }, - ) - summary_path = _write_long_doc_artifacts( - cloud.tree, - cloud.all_pages, - doc_name, - doc_id, - kb_dir, - description=cloud.description, - ) - _run_compile_with_retry( - lambda: compile_long_doc( - doc_name, - summary_path, - doc_id, - kb_dir, - model, - doc_description=cloud.description, - ), - label=f"Compiling imported doc (doc_id={doc_id})", - ) + with kb_ingest_lock(kb_dir / ".openkb"): + registry = HashRegistry(openkb_dir / "hashes.json") + if registry.is_known(synthetic_hash): + click.echo(f" [SKIP] Already imported from PageIndex Cloud: {doc_id}") + return "skipped" - # Register the raw-less cloud entry only after successful compilation. - registry = HashRegistry(openkb_dir / "hashes.json") - meta = { - "name": cloud.cloud_name, - "doc_name": doc_name, - "type": "pageindex_cloud", - "origin": "cloud", - "path": path_key, - "source_path": _registry_path(kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir), - "doc_id": doc_id, - } - registry.remove_by_doc_name(doc_name) - registry.add(synthetic_hash, meta) - snapshot.mark_committed() - except Exception: - if snapshot is None: - click.echo(f" [ERROR] Failed to prepare mutation snapshot for cloud import {doc_id}.") - return "failed" - rollback_error = snapshot.rollback_best_effort() - if rollback_error is None: - snapshot.discard_best_effort() - else: - click.echo( - " [ERROR] Rollback failed; mutation journal retained for recovery: " - f"{snapshot.journal_path}" + stem = _cloud_display_stem(cloud.cloud_name, doc_id) + doc_name = resolve_doc_name_from_key(stem, path_key, registry) + + def commit_body(_snapshot) -> None: + summary_path = _write_long_doc_artifacts( + cloud.tree, + cloud.all_pages, + doc_name, + doc_id, + kb_dir, + description=cloud.description, + ) + _run_compile_with_retry( + lambda: compile_long_doc( + doc_name, + summary_path, + doc_id, + kb_dir, + model, + doc_description=cloud.description, + ), + label=f"Compiling imported doc (doc_id={doc_id})", + ) + + # Register the raw-less cloud entry only after successful compilation. + registry = HashRegistry(openkb_dir / "hashes.json") + meta = { + "name": cloud.cloud_name, + "doc_name": doc_name, + "type": "pageindex_cloud", + "origin": "cloud", + "path": path_key, + "source_path": _registry_path( + kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir + ), + "doc_id": doc_id, + } + registry.remove_by_doc_name(doc_name) + registry.add(synthetic_hash, meta) + + def append_cloud_log() -> None: + append_log(kb_dir / "wiki", "ingest", doc_name) + + plan = AddMutationPlan( + operation="cloud_import", + details={"doc_id": doc_id, "doc_name": doc_name}, + touched_paths=_snapshot_add_paths(kb_dir, doc_name, None, None), + body=commit_body, + post_commit_hooks=[append_cloud_log], + # Cloud import reads from PageIndex Cloud and writes no local blob, + # so .openkb/files is never touched — nothing to snapshot there. + hardlink_dirs={ + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + }, ) + if not run_add_mutation(kb_dir, plan): + return "failed" + except DirtyRollbackError: + raise + except Exception as exc: + # run_add_mutation handles snapshot/body failures itself (returns False), + # so this except only catches pre-mutation errors — surface the real cause + # instead of the old misleading "Failed to prepare mutation snapshot" label. + click.echo(f" [ERROR] Cloud import failed for {doc_id}: {exc}") + logger.debug("Cloud import mutation traceback:", exc_info=True) return "failed" - try: - append_log(kb_dir / "wiki", "ingest", doc_name) - except Exception as exc: - logger.warning("Failed to append ingest log for cloud import %s: %s", doc_id, exc) - cleanup_error = snapshot.discard_best_effort() - if cleanup_error is not None: - click.echo( - f" [WARN] {doc_name} imported, but mutation journal cleanup failed: {cleanup_error}" - ) click.echo(f" [OK] {doc_name} imported from PageIndex Cloud.") return "added" diff --git a/openkb/indexer.py b/openkb/indexer.py index 0f58bef3..c8272200 100644 --- a/openkb/indexer.py +++ b/openkb/indexer.py @@ -203,51 +203,72 @@ def index_long_document(pdf_path: Path, kb_dir: Path, doc_name: str | None = Non f"Failed to index {pdf_path.name} after {max_retries} attempts: {exc}" ) from exc - # Fetch complete document (metadata + structure + text) - doc = col.get_document(doc_id, include_text=True) - indexed_doc_name: str = doc.get("doc_name", pdf_path.stem) - description: str = doc.get("doc_description", "") - structure: list = doc.get("structure", []) - - # Debug: print doc keys and page_count to diagnose get_page_content range - logger.info("Doc keys: %s", list(doc.keys())) - logger.info("page_count from doc: %s", doc.get("page_count", "NOT PRESENT")) - - tree = { - "doc_name": indexed_doc_name, - "doc_description": description, - "structure": structure, - } + # The PageIndex blob for doc_id is now durably on disk. The add mutation no + # longer eagerly snapshots .openkb/files — it registers the new blob via + # snapshot.track_new() only on a successful return — so if any step below + # fails, delete the document we just added. Otherwise the blob leaks as an + # orphan that pageindex.db (rolled back by the snapshot) no longer refs and + # no reaper reclaims. + try: + # Fetch complete document (metadata + structure + text) + doc = col.get_document(doc_id, include_text=True) + indexed_doc_name: str = doc.get("doc_name", pdf_path.stem) + description: str = doc.get("doc_description", "") + structure: list = doc.get("structure", []) + + # Debug: print doc keys and page_count to diagnose get_page_content range + logger.info("Doc keys: %s", list(doc.keys())) + logger.info("page_count from doc: %s", doc.get("page_count", "NOT PRESENT")) + + tree = { + "doc_name": indexed_doc_name, + "doc_description": description, + "structure": structure, + } + + # Write wiki/sources/ — per-page content + sources_dir = kb_dir / "wiki" / "sources" + sources_dir.mkdir(parents=True, exist_ok=True) + images_dir = sources_dir / "images" / source_name + + all_pages: list[dict[str, Any]] = [] + if pageindex_api_key: + # Cloud mode: fetch OCR'd markdown from PageIndex. get_page_content + # requires a page range, so pass "1-N". + page_count = _get_pdf_page_count(pdf_path) + try: + all_pages = _normalize_page_content(col.get_page_content(doc_id, f"1-{page_count}")) + except Exception as exc: + logger.warning("Cloud get_page_content failed for %s: %s", pdf_path.name, exc) + + if not all_pages: + if pageindex_api_key: + logger.warning( + "Cloud returned no pages for %s; falling back to local pymupdf", pdf_path.name + ) + all_pages = _normalize_page_content( + _convert_pdf_to_pages(pdf_path, source_name, images_dir) + ) - # Write wiki/sources/ — per-page content - sources_dir = kb_dir / "wiki" / "sources" - sources_dir.mkdir(parents=True, exist_ok=True) - images_dir = sources_dir / "images" / source_name + if not all_pages: + raise RuntimeError(f"No page content extracted for {pdf_path.name}") - all_pages: list[dict[str, Any]] = [] - if pageindex_api_key: - # Cloud mode: fetch OCR'd markdown from PageIndex. get_page_content - # requires a page range, so pass "1-N". - page_count = _get_pdf_page_count(pdf_path) + _write_long_doc_artifacts( + tree, all_pages, source_name, doc_id, kb_dir, description=description + ) + return IndexResult(doc_id=doc_id, description=description, tree=tree) + except BaseException: + # Best-effort: remove the blob this add created. A failure here (e.g. a + # second interrupt) only means the blob may stay orphaned — the original + # error still propagates so the caller (mutation coordinator) rolls back + # everything else it snapshotted. try: - all_pages = _normalize_page_content(col.get_page_content(doc_id, f"1-{page_count}")) - except Exception as exc: - logger.warning("Cloud get_page_content failed for %s: %s", pdf_path.name, exc) - - if not all_pages: - if pageindex_api_key: + col.delete_document(doc_id) + except Exception: logger.warning( - "Cloud returned no pages for %s; falling back to local pymupdf", pdf_path.name + "PageIndex cleanup of %s failed after error; blob may be orphaned", doc_id ) - all_pages = _normalize_page_content( - _convert_pdf_to_pages(pdf_path, source_name, images_dir) - ) - - if not all_pages: - raise RuntimeError(f"No page content extracted for {pdf_path.name}") - - _write_long_doc_artifacts(tree, all_pages, source_name, doc_id, kb_dir, description=description) - return IndexResult(doc_id=doc_id, description=description, tree=tree) + raise # PageIndex's get_page_content rejects a single page range covering more than diff --git a/openkb/locks.py b/openkb/locks.py index 95fd3960..a085da75 100644 --- a/openkb/locks.py +++ b/openkb/locks.py @@ -180,6 +180,20 @@ def kb_read_lock(openkb_dir: Path): return kb_lock(openkb_dir, exclusive=False) +def kb_ingest_lock_held(openkb_dir: Path) -> bool: + """Return True iff the *current thread* holds the exclusive ingest lock. + + Reentrancy is tracked per-thread (``threading.local``), so a worker thread + returns ``False`` even when the main thread holds the lock. Mutation + primitives use this to assert they run on the lock-owning thread rather + than silently deadlocking on a worker's separate OS ``flock`` acquire. + """ + held = _held_locks() + resolved = (openkb_dir / "ingest.lock").resolve() + exclusive_depth, _ = held.get(resolved, (0, 0)) + return exclusive_depth > 0 + + def _fsync_directory(path: Path) -> None: if os.name == "nt": # Windows cannot open a directory handle to fsync it. os.replace is diff --git a/tests/test_add_command.py b/tests/test_add_command.py index b8cdee84..ae84461e 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -97,6 +97,37 @@ def test_add_single_file_compile_failure_rolls_back_converted_artifacts(self, tm assert not (kb_dir / "wiki" / "sources" / "notes.md").exists() assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + def test_add_single_file_uses_add_mutation_coordinator(self, tmp_path): + from openkb.cli import add_single_file + from openkb.converter import ConvertResult + + kb_dir = self._setup_kb(tmp_path) + doc = tmp_path / "coordinated.md" + doc.write_text("# Coordinated\n", encoding="utf-8") + staging_root = kb_dir / ".openkb" / "staging" / "fake" + source_path = staging_root / "wiki" / "sources" / "coordinated.md" + raw_path = staging_root / "raw" / "coordinated.md" + result = ConvertResult( + raw_path=raw_path, + source_path=source_path, + file_hash="abc123", + doc_name="coordinated", + ) + + with ( + patch("openkb.cli.convert_document", return_value=result), + patch("openkb.cli.publish_staged_tree"), + patch("openkb.cli.asyncio.run"), + patch("openkb.cli._setup_llm_key"), + patch("openkb.add_coordinator.run_add_mutation", return_value=True) as mock_run, + ): + assert add_single_file(doc, kb_dir) == "added" + + plan = mock_run.call_args.args[1] + assert plan.operation == "add" + assert plan.details["doc_name"] == "coordinated" + assert kb_dir / ".openkb" / "hashes.json" in plan.touched_paths + def _long_doc_conv(self, kb_dir, name, file_hash): from openkb.converter import ConvertResult @@ -203,6 +234,33 @@ def test_add_directory_calls_helper_for_each_file(self, tmp_path): assert "b.txt" in called_names assert "ignore.xyz" not in called_names + def test_add_directory_stops_after_dirty_rollback(self, tmp_path): + import pytest + + from openkb.add_coordinator import DirtyRollbackError + + kb_dir = self._setup_kb(tmp_path) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + (docs_dir / "a.md").write_text("# A") + (docs_dir / "b.md").write_text("# B") + dirty_error = DirtyRollbackError( + "add", + kb_dir / ".openkb" / "journal" / "retained.json", + ) + + runner = CliRunner() + with ( + patch("openkb.cli.add_single_file", side_effect=dirty_error) as mock_add, + patch("openkb.cli._find_kb_dir", return_value=kb_dir), + ): + with pytest.raises(DirtyRollbackError) as exc_info: + runner.invoke(cli, ["add", str(docs_dir)], catch_exceptions=False) + + assert exc_info.value is dirty_error + mock_add.assert_called_once() + assert mock_add.call_args.args[0].name == "a.md" + def test_add_unsupported_extension(self, tmp_path): kb_dir = self._setup_kb(tmp_path) doc = tmp_path / "file.xyz" @@ -464,6 +522,191 @@ def test_import_failure_returns_failed_and_registers_nothing(self, tmp_path): registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") assert registry.all_entries() == {} + def test_cloud_import_uses_add_mutation_coordinator(self, tmp_path): + from openkb.cli import import_from_pageindex_cloud + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch("openkb.add_coordinator.run_add_mutation", return_value=True) as mock_run, + patch("openkb.cli._setup_llm_key"), + ): + assert import_from_pageindex_cloud("cloud-1", kb_dir) == "added" + + plan = mock_run.call_args.args[1] + assert plan.operation == "cloud_import" + assert plan.details == {"doc_id": "cloud-1", "doc_name": "Cloud-Paper"} + assert kb_dir / ".openkb" / "hashes.json" in plan.touched_paths + + def test_cloud_import_rechecks_doc_name_under_lock_after_prepare_race(self, tmp_path): + import hashlib + from contextlib import contextmanager + + from openkb.cli import import_from_pageindex_cloud + from openkb.locks import kb_ingest_lock as real_kb_ingest_lock + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + existing_hash = "0" * 64 + injected = {"done": False} + + @contextmanager + def race_before_lock(openkb_dir): + if not injected["done"]: + registry = HashRegistry(openkb_dir / "hashes.json") + registry.add( + existing_hash, + { + "name": "Other Cloud Paper.pdf", + "doc_name": "Cloud-Paper", + "type": "pageindex_cloud", + "origin": "cloud", + "path": "pageindex-cloud:other-cloud", + "source_path": "wiki/sources/Cloud-Paper.json", + "doc_id": "other-cloud", + }, + ) + injected["done"] = True + with real_kb_ingest_lock(openkb_dir): + yield + + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch("openkb.cli.kb_ingest_lock", side_effect=race_before_lock), + patch("openkb.cli.compile_long_doc", return_value=None), + patch("openkb.cli._setup_llm_key"), + ): + assert import_from_pageindex_cloud("cloud-1", kb_dir) == "added" + + registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") + synthetic = hashlib.sha256(b"pageindex-cloud:cloud-1").hexdigest() + expected_doc_name = f"Cloud-Paper-{synthetic[:8]}" + assert registry.get(existing_hash)["doc_name"] == "Cloud-Paper" + assert registry.get(synthetic)["doc_name"] == expected_doc_name + assert (kb_dir / "wiki" / "sources" / f"{expected_doc_name}.json").exists() + + def test_cloud_import_skips_if_same_doc_id_registered_after_fetch(self, tmp_path): + import hashlib + from contextlib import contextmanager + + from openkb.cli import import_from_pageindex_cloud + from openkb.locks import kb_ingest_lock as real_kb_ingest_lock + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + path_key = "pageindex-cloud:cloud-1" + synthetic = hashlib.sha256(path_key.encode("utf-8")).hexdigest() + lock_calls = {"count": 0} + + @contextmanager + def register_same_doc_before_second_lock(openkb_dir): + lock_calls["count"] += 1 + if lock_calls["count"] == 2: + HashRegistry(openkb_dir / "hashes.json").add( + synthetic, + { + "name": "Cloud Paper.pdf", + "doc_name": "Cloud-Paper", + "type": "pageindex_cloud", + "origin": "cloud", + "path": path_key, + "source_path": "wiki/sources/Cloud-Paper.json", + "doc_id": "cloud-1", + }, + ) + with real_kb_ingest_lock(openkb_dir): + yield + + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud) as mock_prepare, + patch("openkb.cli.kb_ingest_lock", side_effect=register_same_doc_before_second_lock), + patch("openkb.cli.compile_long_doc") as mock_compile, + patch("openkb.add_coordinator.run_add_mutation") as mock_run, + patch("openkb.cli._setup_llm_key"), + ): + assert import_from_pageindex_cloud("cloud-1", kb_dir) == "skipped" + + assert lock_calls["count"] == 2 + mock_prepare.assert_called_once() + mock_compile.assert_not_called() + mock_run.assert_not_called() + assert HashRegistry(kb_dir / ".openkb" / "hashes.json").is_known(synthetic) + + def test_cloud_import_propagates_dirty_rollback(self, tmp_path): + import pytest + + from openkb.add_coordinator import DirtyRollbackError + from openkb.cli import import_from_pageindex_cloud + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + dirty_error = DirtyRollbackError( + "cloud_import", + kb_dir / ".openkb" / "journal" / "retained.json", + ) + + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch("openkb.add_coordinator.run_add_mutation", side_effect=dirty_error), + patch("openkb.cli._setup_llm_key"), + ): + with pytest.raises(DirtyRollbackError) as exc_info: + import_from_pageindex_cloud("cloud-1", kb_dir) + + assert exc_info.value is dirty_error + + def test_cloud_import_failure_message_names_real_cause(self, tmp_path, capsys): + """A non-body failure caught at the outer except (here: name resolution) + must surface the real error, not be mislabeled as 'Failed to prepare + mutation snapshot'. run_add_mutation handles snapshot/body failures + itself and returns False, so the broad except only ever catches + pre-mutation errors that the old label misdescribed.""" + from openkb.cli import import_from_pageindex_cloud + + kb_dir = self._setup_kb(tmp_path) + cloud = self._cloud_data(doc_name="Cloud-Paper") + + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch( + "openkb.cli.resolve_doc_name_from_key", + side_effect=RuntimeError("name resolution blew up"), + ), + patch("openkb.cli._setup_llm_key"), + ): + assert import_from_pageindex_cloud("cloud-1", kb_dir) == "failed" + + out = capsys.readouterr().out + assert "Failed to prepare mutation snapshot" not in out + assert "name resolution blew up" in out + + def test_cloud_import_keyboard_interrupt_rolls_back_artifacts(self, tmp_path): + import pytest + + from openkb.cli import import_from_pageindex_cloud + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + doc_name = "Cloud-Paper" + cloud = self._cloud_data(doc_name=doc_name) + + with ( + patch("openkb.cli.prepare_cloud_import", return_value=cloud), + patch("openkb.cli.compile_long_doc", side_effect=KeyboardInterrupt()), + patch("openkb.cli._setup_llm_key"), + ): + with pytest.raises(KeyboardInterrupt): + import_from_pageindex_cloud("cloud-1", kb_dir) + + assert not (kb_dir / "wiki" / "summaries" / f"{doc_name}.md").exists() + assert not (kb_dir / "wiki" / "sources" / f"{doc_name}.json").exists() + assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + assert list((kb_dir / ".openkb" / "journal").glob("*.json")) == [] + def test_compile_failure_cleans_up_orphan_artifacts(self, tmp_path): """If import succeeds (artifacts written) but compile fails twice, the summary/source artifacts are cleaned up — no registry entry exists, so @@ -492,3 +735,180 @@ def test_compile_failure_cleans_up_orphan_artifacts(self, tmp_path): assert not (kb_dir / "wiki" / "sources" / f"{doc_name}.json").exists() # Nothing registered → a retry is not skipped. assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + + +class TestAddMutationCoordinator: + def _setup_kb(self, tmp_path): + (tmp_path / "raw").mkdir() + (tmp_path / "wiki" / "sources" / "images").mkdir(parents=True) + (tmp_path / "wiki" / "summaries").mkdir(parents=True) + (tmp_path / "wiki" / "concepts").mkdir(parents=True) + (tmp_path / "wiki" / "entities").mkdir(parents=True) + openkb_dir = tmp_path / ".openkb" + openkb_dir.mkdir() + (openkb_dir / "config.yaml").write_text("model: gpt-4o-mini\n", encoding="utf-8") + (openkb_dir / "hashes.json").write_text("{}", encoding="utf-8") + return tmp_path + + def test_coordinator_rolls_back_before_commit_and_skips_post_commit(self, tmp_path): + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + from openkb.locks import kb_ingest_lock + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + post_commit_calls = [] + + def body(_snapshot): + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# changed\n", encoding="utf-8") + raise RuntimeError("before commit") + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[official], + body=body, + post_commit_hooks=[lambda: post_commit_calls.append("ran")], + ) + + with kb_ingest_lock(kb_dir / ".openkb"): + assert run_add_mutation(kb_dir, plan) is False + assert not official.exists() + assert post_commit_calls == [] + + def test_coordinator_reports_failed_mutation(self, tmp_path, capsys): + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + from openkb.locks import kb_ingest_lock + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + + def body(_snapshot): + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# changed\n", encoding="utf-8") + raise RuntimeError("boom") + + plan = AddMutationPlan( + operation="add", + details={"name": "doc.md", "doc_name": "doc"}, + touched_paths=[official], + body=body, + ) + + with kb_ingest_lock(kb_dir / ".openkb"): + assert run_add_mutation(kb_dir, plan) is False + + output = capsys.readouterr().out + assert "[ERROR] add failed for doc.md: boom" in output + + def test_coordinator_post_commit_failure_does_not_roll_back(self, tmp_path): + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + from openkb.locks import kb_ingest_lock + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + + def body(_snapshot): + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# committed\n", encoding="utf-8") + + def bad_hook(): + raise RuntimeError("hook failed") + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[official], + body=body, + post_commit_hooks=[bad_hook], + ) + + with kb_ingest_lock(kb_dir / ".openkb"): + assert run_add_mutation(kb_dir, plan) is True + assert official.read_text(encoding="utf-8") == "# committed\n" + assert list((kb_dir / ".openkb" / "journal").glob("*.json")) == [] + + def test_coordinator_keyboard_interrupt_rolls_back_and_reraises(self, tmp_path): + import pytest + + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + from openkb.locks import kb_ingest_lock + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + post_commit_calls = [] + + def body(_snapshot): + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# interrupted\n", encoding="utf-8") + raise KeyboardInterrupt() + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[official], + body=body, + post_commit_hooks=[lambda: post_commit_calls.append("ran")], + ) + + with kb_ingest_lock(kb_dir / ".openkb"): + with pytest.raises(KeyboardInterrupt): + run_add_mutation(kb_dir, plan) + + assert not official.exists() + assert post_commit_calls == [] + assert list((kb_dir / ".openkb" / "journal").glob("*.json")) == [] + + def test_run_add_mutation_requires_lock_held(self, tmp_path): + import pytest + + from openkb.add_coordinator import AddMutationPlan, run_add_mutation + + kb_dir = self._setup_kb(tmp_path) + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[kb_dir / "wiki" / "sources" / "doc.md"], + body=lambda _snapshot: None, + ) + + with pytest.raises(RuntimeError, match="requires the caller to hold kb_ingest_lock"): + run_add_mutation(kb_dir, plan) + + def test_run_add_mutation_raises_dirty_rollback_when_rollback_fails(self, tmp_path): + import pytest + + from openkb.add_coordinator import AddMutationPlan, DirtyRollbackError, run_add_mutation + from openkb.locks import kb_ingest_lock + from openkb.mutation import MutationSnapshot + + kb_dir = self._setup_kb(tmp_path) + official = kb_dir / "wiki" / "sources" / "doc.md" + official.parent.mkdir(parents=True, exist_ok=True) + official.write_text("# before\n", encoding="utf-8") + + def body(_snapshot): + official.write_text("# after\n", encoding="utf-8") + raise RuntimeError("body fails after partial apply") + + plan = AddMutationPlan( + operation="add", + details={"doc_name": "doc"}, + touched_paths=[official], + body=body, + ) + + # Force rollback to fail: rollback_best_effort returns the error instead + # of None, so the active journal is retained for next-run recovery. + with kb_ingest_lock(kb_dir / ".openkb"): + with patch.object( + MutationSnapshot, "rollback_best_effort", return_value=OSError("disk full") + ): + with pytest.raises(DirtyRollbackError) as exc_info: + run_add_mutation(kb_dir, plan) + + assert exc_info.value.operation == "add" + # The journal is retained on disk for next-run recovery. + assert exc_info.value.journal_path.exists() + assert exc_info.value.journal_path.is_file() diff --git a/tests/test_indexer.py b/tests/test_indexer.py index 3af85b04..1c9e4485 100644 --- a/tests/test_indexer.py +++ b/tests/test_indexer.py @@ -98,6 +98,29 @@ def test_returns_index_result(self, kb_dir, sample_tree, tmp_path): assert result.description == sample_tree["doc_description"] assert result.tree is not None + def test_deletes_pageindex_doc_when_a_post_add_step_fails(self, kb_dir, sample_tree, tmp_path): + """The PageIndex blob is durably written by col.add(), but .openkb/files is + no longer in the add mutation's eager snapshot — track_new only registers + the blob on a successful return. So if any step after col.add() raises + (here: get_document), index_long_document must delete the doc it just + added; otherwise the blob leaks as an orphan that pageindex.db — rolled + back by the snapshot — no longer references, and no reaper reclaims.""" + doc_id = "abc-123" + col = self._make_fake_collection(doc_id, sample_tree) + col.get_document.side_effect = RuntimeError("get_document blew up") + + fake_client = MagicMock() + fake_client.collection.return_value = col + + pdf_path = tmp_path / "sample.pdf" + pdf_path.write_bytes(b"%PDF-1.4 fake") + + with patch("openkb.indexer.PageIndexClient", return_value=fake_client): + with pytest.raises(RuntimeError, match="get_document blew up"): + index_long_document(pdf_path, kb_dir) + + col.delete_document.assert_called_once_with(doc_id) + def test_source_page_written_as_json(self, kb_dir, sample_tree, tmp_path): """Long doc source should be written as JSON, not markdown.""" import json as json_mod diff --git a/tests/test_locks.py b/tests/test_locks.py index 477898ef..fec8289f 100644 --- a/tests/test_locks.py +++ b/tests/test_locks.py @@ -12,6 +12,7 @@ atomic_write_json, atomic_write_text, kb_ingest_lock, + kb_ingest_lock_held, kb_read_lock, ) @@ -77,6 +78,29 @@ def test_write_lock_can_take_nested_read(tmp_path): assert (openkb_dir / "ingest.lock").exists() +def test_kb_ingest_lock_held_is_exclusive_and_thread_local(tmp_path): + openkb_dir = tmp_path / ".openkb" + worker_seen = [] + + assert not kb_ingest_lock_held(openkb_dir) + + with kb_read_lock(openkb_dir): + assert not kb_ingest_lock_held(openkb_dir) + + with kb_ingest_lock(openkb_dir): + assert kb_ingest_lock_held(openkb_dir) + + worker = threading.Thread( + target=lambda: worker_seen.append(kb_ingest_lock_held(openkb_dir)) + ) + worker.start() + worker.join(timeout=2) + + assert not worker.is_alive() + assert worker_seen == [False] + assert not kb_ingest_lock_held(openkb_dir) + + def test_atomic_write_text_replaces_file(tmp_path): target = tmp_path / "nested" / "file.txt" atomic_write_text(target, "first")