feat(workflows): honor max_concurrency in fan-out via a bounded thread pool#3224
Conversation
There was a problem hiding this comment.
Pull request overview
Implements actual parallel execution for workflow fan-out by honoring max_concurrency, while making RunState persistence safe under concurrent execution (locking + atomic JSON writes). This fits into the workflow engine’s execution model by enabling opt-in bounded parallelism for I/O-bound fan-out items without changing default sequential behavior.
Changes:
- Add
WorkflowEngine._run_fan_out()to execute fan-out items sequentially or via a boundedThreadPoolExecutordepending onmax_concurrency. - Make
RunState.save()concurrency-safe via a per-run lock and atomic temp-file writes; route step result recording through a locked helper. - Add workflow tests covering fan-out concurrency behavior, ordering, coercion, and error/exception handling.
Show a summary per file
| File | Description |
|---|---|
src/specify_cli/workflows/engine.py |
Adds bounded concurrent fan-out execution and hardens run-state persistence for concurrency. |
tests/test_workflows.py |
Adds a new test suite validating fan-out concurrency semantics and edge cases. |
Review details
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 2/2 changed files
- Comments generated: 5
- Review effort level: Low
|
Thanks @copilot — all five points addressed at root cause (latest: d4479ed):
Coverage added: concurrent halt-includes-halting-item, continue_on_error-does-not-truncate, and unknown-template-type-matches-sequential. Full workflows suite green (328 passed), @mnriem could you give this a review when you get a chance? 🙏 |
…ut, faithful halt Address the reviewer feedback on the bounded fan-out concurrency: - Sliding submission window: keep at most `workers` items in flight and stop launching new items once the run is halting, instead of submitting all items up front (which let the pool keep starting queued work after a halt). - Faithful halt prefix: attribute a halt to the specific item whose own recorded result halted the run (replaying the sequential break condition, honoring continue_on_error/aborted), not the shared run status a later concurrent item may have flipped. The returned prefix now includes the actual halting item, matching the sequential path. An item that fails before recording a result (e.g. an unknown step type) is attributed too, since every item runs the same template. - Lock the parent fan-out output mutation: route the post-fan-out step_results[...]['output'] update through a new RunState.set_step_output() under the run lock, so it cannot race a concurrent save(). - Docstring: describe int() coercion accurately (numeric strings / floats are honored; only non-coercible or <= 1 runs sequentially). Tests: add concurrent halt-includes-halting-item, continue_on_error-does-not- truncate, and unknown-template-type-matches-sequential coverage; make the timing test use a monotonic clock with a looser threshold to avoid CI flakiness.
38c7798 to
d4479ed
Compare
- append_log: serialize the log_entries append + log.jsonl write under a dedicated RunState._log_lock so concurrent fan-out workers can't interleave or corrupt log lines (kept separate from the state lock; never nested). - _run_fan_out.run_item: read the item output back through the item_ctx it executed against rather than the outer context closure — clearer and robust if StepContext ever stops sharing the steps dict by reference. - StepBase: document the thread-safety contract — STEP_REGISTRY holds one shared instance per type, so concurrent fan-out invokes execute() on the same object; implementations must be stateless/thread-safe (the built-ins already are). - test_concurrency_is_real: prove parallelism deterministically with a threading.Barrier (sequential execution can't clear it) instead of a wall-clock timing assertion.
|
Thanks @copilot — second-pass comments addressed at root cause in ce352a3:
Full workflows suite green (328 passed), @mnriem would appreciate your review when you have a chance 🙏 |
…y cancel semantics - RunState.save(): move the updated_at timestamp assignment inside the run lock so the timestamp matches the snapshot the thread serializes and concurrent savers don't race on it. - _run_fan_out docstring: clarify that on a halt only not-yet-started items are cancelled; items already running finish but their outputs are ignored (Future.cancel() can't stop running work, and the pool joins on exit).
|
Thanks @copilot — both addressed at root cause in 6f03222:
Full workflows suite green (328 passed), @mnriem thanks again for merging #3225 — would appreciate your review on this one too when you have a moment 🙏 |
The concurrent fan-out path invokes _execute_steps from worker threads, which calls the engine's on_step_start callback (the CLI sets it to a console.print lambda). Concurrent invocation could interleave/garble progress output. Guard the call with a WorkflowEngine._callback_lock so callbacks are serialized; the lock is uncontended for sequential runs.
|
Thanks @copilot — addressed at root cause in f78db82: Concurrent Full workflows suite green (328 passed), @mnriem would appreciate your review when you have a chance 🙏 |
…eback In _run_fan_out's concurrent path, a worker exception was stashed in first_exc and re-raised after the loop. Re-raise it from within the except block with a bare `raise` (after cancelling outstanding futures) so the original traceback is preserved, and drop the now-unneeded first_exc variable. The ThreadPoolExecutor __exit__ still joins any already-running workers before the exception escapes.
|
Thanks @copilot — addressed at root cause in 73ced6a: Worker exception traceback ( Full workflows suite green (328 passed), @mnriem would appreciate your review when you have a moment 🙏 |
…te, bound workers Address third review pass: - Remove the unlocked `context.steps[step_id]["output"] = …` writes in the fan-out parent update. context.steps[step_id] is the same dict object that set_step_output() updates under the run lock, so the direct (unsynchronized) mutation was redundant. - Preserve sequential halt semantics under concurrency: a later in-flight item could overwrite state.status after the halting item was identified. _run_fan_out now derives the halting item's run status (item_halt_status, replacing the bool item_halted) and restores it after the pool joins, so the final status is the first halting item's outcome. - Bound the pool: workers = min(max_concurrency, len(items)) and early-return for empty items, so a user-controlled max_concurrency can't over-allocate threads. Add coverage that an earlier PAUSED item's status wins over a later concurrent FAILED item.
|
Thanks @copilot — all three addressed at root cause in 56caa29:
Full workflows suite green (329 passed), @mnriem would appreciate your review when you have a moment 🙏 |
|
Please address Copilot feedback. If not applicable, please explain why. And please resolve conflicts |
…step_results On a resume run, StepContext is built with steps=state.step_results, so the two direct `context.steps[...] = ...` writes mutated the shared dict outside the run lock and could race save(). Route both through a new _record_result helper that mirrors into context.steps only when it is a distinct object (a fresh run) and otherwise relies solely on record_step_result's locked write.
…x-concurrency # Conflicts: # tests/test_workflows.py
|
@mnriem done on both counts: Conflicts resolved — merged latest Copilot feedback addressed (
Full workflows suite green (340 passed, including the merged fan-in tests), |
|
Thank you! |
Upstream commits merged (19): - Retire iflow/roo/windsurf integrations (github#3166, github#3167, github#3168, github#3211, github#3212, github#3213) - Move version_satisfies to _utils.py, allow prereleases (github#2695) - Workflow fan-out max_concurrency via bounded thread pool (github#3224) - Reject bool max_iterations in while/do-while validation (github#3237) - bash 3.2 portability: echo→printf, ${word^^}→tr (github#3192) - --no-persist in common.sh for read-only path resolution (github#3025) - Reject host-less catalog URLs (github#3209, github#3227) - Extension updates: Intake v0.1.3, Architecture Workflow v1.2.2, Repository Governance, Workflow Preset v1.3.11 - Release 0.12.1 → 0.12.2 → 0.12.3.dev0 (github#3253, github#3259) - CI Python matrix alignment + bash 3.2 portability (github#3244) - Docs: Windsurf→Kilo Code references throughout Conflicts resolved (2): - pyproject.toml: kept fork name/description, bumped to 0.12.2+adlc1 - AGENTS.md: accepted upstream's condensed agent table (retired agents removed) Assisted-by: opencode (model: glm-5.2, supervised)
Description
Closes #3222.
The
fan-outstep has carried amax_concurrencyfield since the workflow engine landed (#2158), but the engine ignored it:_execute_stepsran fan-out items in a sequentialforloop andmax_concurrencywas only recorded in the step output. This honors it.A new
WorkflowEngine._run_fan_outruns items on a boundedThreadPoolExecutorwhenmax_concurrency > 1, and takes the existing sequential path when<= 1(the default) — so existing workflows are byte-for-byte unchanged. Results are always assembled in item order (a preallocated slot per item, collected in submission order), never completion order, so fan-in — which reads them positionally — is unaffected. TheparentId:templateId:indexid grammar and halt-on-first-failure are preserved;max_concurrencyis coerced withint(): a value that cannot be coerced (None, a non-numeric string) or that coerces to<= 1runs sequentially, while a numeric string like"4"or a float like4.0is honored.Fan-out items are I/O-bound — each typically dispatches a
commandstep that spawns a blocking agent-CLI subprocess, which releases the GIL — so a thread pool yields real wall-clock parallelism.Two concurrency care points:
dataclasses.replace(context, item=…), socontext.itemis never clobbered across threads; the sharedstepsdict is written only on the disjointparent:template:indexkey.RunState.save()previously serialized the livestep_resultsdict via a plainopen("w"), so a concurrent fan-out could both interleave on-disk writes and mutate the dict mid-json.dump(dictionary changed size during iteration).save()is now held under a per-run lock and written atomically (temp file +os.replace), and per-item result recording goes through a smallrecord_step_resulthelper under that lock. Sequential runs see only an uncontended lock.A genuine exception escaping an item (as opposed to a normal step
FAILED, which sets the run status) cancels outstanding work and re-raises, so the run is marked failed rather than reporting a vacuous completion.Testing
.venv/bin/python -m pytest tests/test_workflows.py— 325 passed, including 15 newTestFanOutConcurrencycases: K≤1 sequential parity, item-order under forced reverse completion (event chain, no sleeps), real parallelism,max_concurrencycoercion (0 / negative / None / non-int / string), per-thread item isolation, halt-on-failure prefix, and first-exception cancel + re-raise.test_timestamp_branches/ git extension) that fail identically onmain.uvx ruff check src/ tests/test_workflows.py— cleanuv run specify --helpAI Disclosure
Code, tests, and this description were authored with AI assistance (Claude Code), from a fan-out concurrency investigation; everything was verified by running the repo's test suite and ruff locally.
@mnriem — would appreciate your review when you have a moment. Happy to swap the
save()atomicity for a narrower lock if you'd prefer a smaller change.