diff --git a/crates/taurus-core/src/runtime/engine.rs b/crates/taurus-core/src/runtime/engine.rs index b91b5bd..bdaddf6 100644 --- a/crates/taurus-core/src/runtime/engine.rs +++ b/crates/taurus-core/src/runtime/engine.rs @@ -192,8 +192,9 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> EngineExecutionReport { - self.execute_graph_with_execution_id_report_async( + self.execute_graph_with_project_id_report_async( execution_id, + flow.project_id, flow.starting_node_id, flow.node_functions, flow.input_value, @@ -373,6 +374,30 @@ impl ExecutionEngine { remote: Option<&dyn RemoteRuntime>, respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, + ) -> EngineExecutionReport { + self.execute_graph_with_project_id_report_async( + execution_id, + 0, + start_node_id, + node_functions, + flow_input, + remote, + respond_emitter, + with_trace, + ) + .await + } + + async fn execute_graph_with_project_id_report_async( + &self, + execution_id: ExecutionId, + project_id: i64, + start_node_id: i64, + node_functions: Vec, + flow_input: Option, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, ) -> EngineExecutionReport { if let Some(emitter) = respond_emitter { emitter.emit(execution_id, EmitType::StartingExec, null_value()); @@ -383,7 +408,7 @@ impl ExecutionEngine { None => ValueStore::default(), }; - let compiled = match compile_flow(start_node_id, node_functions) { + let compiled = match compile_flow(project_id, start_node_id, node_functions) { Ok(plan) => plan, Err(err) => { let runtime_error = err.as_runtime_error(); @@ -628,6 +653,7 @@ mod tests { struct StubRemoteRuntime { result: NodeExecutionResult, target_services: Option>>>, + project_ids: Option>>>, } #[async_trait] @@ -643,6 +669,12 @@ mod tests { .expect("target service recorder should not be poisoned") .push(execution.target_service); } + if let Some(project_ids) = &self.project_ids { + project_ids + .lock() + .expect("project id recorder should not be poisoned") + .push(execution.request.project_id); + } Ok(self.result.clone()) } @@ -1240,6 +1272,7 @@ mod tests { result: None, }, target_services: None, + project_ids: None, }; let mut remote_node = node( 1, @@ -1285,6 +1318,7 @@ mod tests { result: Some(node_execution_result::Result::Success(string_value("ok"))), }, target_services: Some(Arc::clone(&target_services)), + project_ids: None, }; let mut remote_node = node( 1, @@ -1306,6 +1340,47 @@ mod tests { ); } + #[test] + fn remote_execution_uses_flow_project_id() { + let engine = ExecutionEngine::new(); + let project_ids = Arc::new(Mutex::new(Vec::new())); + let remote = StubRemoteRuntime { + result: NodeExecutionResult { + started_at: 1, + finished_at: 2, + parameter_results: Vec::new(), + id: Some(node_execution_result::Id::NodeId(99)), + result: Some(node_execution_result::Result::Success(string_value("ok"))), + }, + target_services: None, + project_ids: Some(Arc::clone(&project_ids)), + }; + let mut remote_node = node( + 1, + "remote::project", + vec![literal_param(100, "payload", int_value(20))], + None, + ); + remote_node.definition_source = Some("action.example".to_string()); + let flow = ExecutionFlow { + flow_id: 10, + project_id: 42, + starting_node_id: 1, + node_functions: vec![remote_node], + input_value: None, + }; + + let report = engine.execute_flow_report(flow, Some(&remote), None, false); + + assert_eq!(report.exit_reason, ExitReason::Success); + assert_eq!( + *project_ids + .lock() + .expect("project id recorder should not be poisoned"), + vec![42] + ); + } + #[test] fn remote_execution_rejects_empty_action_definition_source() { let engine = ExecutionEngine::new(); diff --git a/crates/taurus-core/src/runtime/engine/compiler.rs b/crates/taurus-core/src/runtime/engine/compiler.rs index 1f039ce..e291ac1 100644 --- a/crates/taurus-core/src/runtime/engine/compiler.rs +++ b/crates/taurus-core/src/runtime/engine/compiler.rs @@ -108,6 +108,7 @@ impl CompileError { } pub fn compile_flow( + project_id: i64, start_node_id: i64, nodes: Vec, ) -> Result { @@ -207,6 +208,7 @@ pub fn compile_flow( } Ok(CompiledFlow { + project_id, start_idx, nodes: compiled_nodes, node_idx_by_id, diff --git a/crates/taurus-core/src/runtime/engine/executor.rs b/crates/taurus-core/src/runtime/engine/executor.rs index 91902b0..fbd93f1 100644 --- a/crates/taurus-core/src/runtime/engine/executor.rs +++ b/crates/taurus-core/src/runtime/engine/executor.rs @@ -871,7 +871,7 @@ impl<'a> EngineExecutor<'a> { execution_identifier: Uuid::new_v4().to_string(), function_identifier: node.handler_id.clone(), parameters: Some(Struct { fields }), - project_id: 0, + project_id: self.flow.project_id, }) } diff --git a/crates/taurus-core/src/runtime/engine/model.rs b/crates/taurus-core/src/runtime/engine/model.rs index 97eeefa..e361bb3 100644 --- a/crates/taurus-core/src/runtime/engine/model.rs +++ b/crates/taurus-core/src/runtime/engine/model.rs @@ -51,6 +51,7 @@ pub struct CompiledNode { /// Compiled flow plan. #[derive(Debug, Clone)] pub struct CompiledFlow { + pub project_id: i64, pub start_idx: usize, pub nodes: Vec, pub node_idx_by_id: HashMap,