From f24d8ccb8a2db011405dced02319360567b7975b Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 29 Jun 2026 15:19:49 -0700 Subject: [PATCH 01/15] feat(data-retention): granular PII redaction stages (input + block outputs) --- apps/pii/server.py | 114 +++++-- .../[id]/data-retention/route.ts | 16 + .../components/data-retention-settings.tsx | 289 +++++++++++++----- apps/sim/executor/execution/block-executor.ts | 13 + apps/sim/executor/execution/executor.ts | 1 + apps/sim/executor/execution/types.ts | 15 + apps/sim/executor/types.ts | 3 + .../lib/api/contracts/data-retention.test.ts | 64 +++- apps/sim/lib/api/contracts/primitives.ts | 43 ++- apps/sim/lib/billing/retention.test.ts | 178 ++++++++--- apps/sim/lib/billing/retention.ts | 78 ++++- apps/sim/lib/guardrails/mask-client.ts | 46 +-- apps/sim/lib/guardrails/pii-batching.ts | 41 +++ apps/sim/lib/guardrails/pii-entities.ts | 183 +++++++++++ apps/sim/lib/guardrails/validate_pii.test.ts | 13 + apps/sim/lib/guardrails/validate_pii.ts | 107 ++++++- apps/sim/lib/logs/execution/logger.ts | 2 +- .../lib/logs/execution/pii-redaction.test.ts | 53 +++- apps/sim/lib/logs/execution/pii-redaction.ts | 119 ++++++-- .../lib/workflows/executor/execution-core.ts | 41 +++ packages/db/schema.ts | 25 +- 21 files changed, 1199 insertions(+), 245 deletions(-) create mode 100644 apps/sim/lib/guardrails/pii-batching.ts diff --git a/apps/pii/server.py b/apps/pii/server.py index 3fbd9859e45..4029bac1019 100644 --- a/apps/pii/server.py +++ b/apps/pii/server.py @@ -10,7 +10,13 @@ from typing import Any from fastapi import FastAPI -from presidio_analyzer import AnalyzerEngine, Pattern, PatternRecognizer, RecognizerResult +from presidio_analyzer import ( + AnalyzerEngine, + BatchAnalyzerEngine, + Pattern, + PatternRecognizer, + RecognizerResult, +) from presidio_analyzer.nlp_engine import NlpEngineProvider from presidio_analyzer.predefined_recognizers import ( AuAbnRecognizer, @@ -133,6 +139,7 @@ def build_analyzer() -> AnalyzerEngine: analyzer = build_analyzer() +batch_analyzer = BatchAnalyzerEngine(analyzer_engine=analyzer) anonymizer = AnonymizerEngine() # Propagates to uvicorn's root handler, so timing lands in the container log stream. @@ -149,6 +156,13 @@ class AnalyzeRequest(BaseModel): return_decision_process: bool = False +class AnalyzeBatchRequest(BaseModel): + texts: list[str] + language: str = "en" + entities: list[str] | None = None + score_threshold: float | None = None + + class AnonymizeRequest(BaseModel): text: str analyzer_results: list[dict[str, Any]] = [] @@ -156,6 +170,51 @@ class AnonymizeRequest(BaseModel): operators: dict[str, dict[str, Any]] | None = None +class AnonymizeBatchItem(BaseModel): + text: str + analyzer_results: list[dict[str, Any]] = [] + + +class AnonymizeBatchRequest(BaseModel): + items: list[AnonymizeBatchItem] = [] + anonymizers: dict[str, dict[str, Any]] | None = None + operators: dict[str, dict[str, Any]] | None = None + + +def build_operators( + raw_operators: dict[str, dict[str, Any]] | None, +) -> dict[str, OperatorConfig] | None: + if not raw_operators: + return None + operators: dict[str, OperatorConfig] = {} + for entity, raw_cfg in raw_operators.items(): + op_cfg = dict(raw_cfg) + op_type = op_cfg.pop("type", "replace") + operators[entity] = OperatorConfig(op_type, op_cfg) + return operators + + +def run_anonymize( + text: str, + raw_results: list[dict[str, Any]], + operators: dict[str, OperatorConfig] | None, +): + analyzer_results = [ + RecognizerResult( + entity_type=r["entity_type"], + start=r["start"], + end=r["end"], + score=r.get("score", 1.0), + ) + for r in raw_results + ] + return anonymizer.anonymize( + text=text, + analyzer_results=analyzer_results, + operators=operators, + ) + + @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} @@ -186,35 +245,28 @@ def analyze(req: AnalyzeRequest) -> list[dict[str, Any]]: return [r.to_dict() for r in results] +@app.post("/analyze_batch") +def analyze_batch(req: AnalyzeBatchRequest) -> list[list[dict[str, Any]]]: + """Analyze many texts in one pass (spaCy nlp.pipe), returning one span list + per input in request order — the batched counterpart to /analyze.""" + results = batch_analyzer.analyze_iterator( + texts=req.texts, + language=req.language, + entities=req.entities or None, + score_threshold=req.score_threshold, + ) + return [[r.to_dict() for r in per_text] for per_text in results] + + @app.post("/anonymize") def anonymize(req: AnonymizeRequest) -> dict[str, Any]: started = time.perf_counter() - analyzer_results = [ - RecognizerResult( - entity_type=r["entity_type"], - start=r["start"], - end=r["end"], - score=r.get("score", 1.0), - ) - for r in req.analyzer_results - ] - raw_operators = req.anonymizers or req.operators - operators = None - if raw_operators: - operators = {} - for entity, raw_cfg in raw_operators.items(): - op_cfg = dict(raw_cfg) - op_type = op_cfg.pop("type", "replace") - operators[entity] = OperatorConfig(op_type, op_cfg) - result = anonymizer.anonymize( - text=req.text, - analyzer_results=analyzer_results, - operators=operators, - ) + operators = build_operators(req.anonymizers or req.operators) + result = run_anonymize(req.text, req.analyzer_results, operators) logger.info( "anonymize chars=%d spans=%d duration_ms=%.1f", len(req.text), - len(analyzer_results), + len(req.analyzer_results), (time.perf_counter() - started) * 1000, ) return { @@ -230,3 +282,17 @@ def anonymize(req: AnonymizeRequest) -> dict[str, Any]: for item in result.items ], } + + +@app.post("/anonymize_batch") +def anonymize_batch(req: AnonymizeBatchRequest) -> dict[str, list[str]]: + """Mask many texts in one pass, returning masked text per item in request + order — the batched counterpart to /anonymize. Anonymization is pure string + work (no NLP), so callers should send only items with detected spans.""" + operators = build_operators(req.anonymizers or req.operators) + return { + "texts": [ + run_anonymize(item.text, item.analyzer_results, operators).text + for item in req.items + ] + } diff --git a/apps/sim/app/api/organizations/[id]/data-retention/route.ts b/apps/sim/app/api/organizations/[id]/data-retention/route.ts index 17213f854c3..830939eda73 100644 --- a/apps/sim/app/api/organizations/[id]/data-retention/route.ts +++ b/apps/sim/app/api/organizations/[id]/data-retention/route.ts @@ -42,6 +42,22 @@ function normalizeConfigured( rules: settings.piiRedaction.rules.map((rule) => ({ ...rule, language: coercePiiLanguage(rule.language), + stages: rule.stages + ? { + input: { + ...rule.stages.input, + language: coercePiiLanguage(rule.stages.input.language), + }, + blockOutputs: { + ...rule.stages.blockOutputs, + language: coercePiiLanguage(rule.stages.blockOutputs.language), + }, + logs: { + ...rule.stages.logs, + language: coercePiiLanguage(rule.stages.logs.language), + }, + } + : undefined, })), } : null, diff --git a/apps/sim/ee/data-retention/components/data-retention-settings.tsx b/apps/sim/ee/data-retention/components/data-retention-settings.tsx index af4daade496..cdf647c6bda 100644 --- a/apps/sim/ee/data-retention/components/data-retention-settings.tsx +++ b/apps/sim/ee/data-retention/components/data-retention-settings.tsx @@ -1,14 +1,16 @@ 'use client' -import { useEffect, useRef, useState } from 'react' +import { type ReactNode, useEffect, useRef, useState } from 'react' import { Checkbox, Chip, + ChipConfirmModal, ChipDropdown, ChipInput, ChipSelect, ChipSwitch, ChipTag, + Info, Search, toast, } from '@sim/emcn' @@ -22,11 +24,18 @@ import type { RetentionOverride } from '@/lib/api/contracts/primitives' import { useSession } from '@/lib/auth/auth-client' import { isBillingEnabled } from '@/lib/core/config/env-flags' import { - DEFAULT_PII_LANGUAGE, - PII_ENTITY_GROUPS, + emptyPiiStages, + getEntityGroupsForLanguage, + isEntitySupportedForLanguage, + normalizeRuleStages, PII_LANGUAGES, + PII_STAGE_META, + PII_STAGES, + type PIIEntityType, type PIILanguage, - SUPPORTED_PII_ENTITIES, + type PiiStageKey, + type PiiStagePolicy, + type PiiStages, } from '@/lib/guardrails/pii-entities' import { getUserRole } from '@/lib/workspaces/organization/utils' import { UnsavedChangesModal } from '@/app/workspace/[workspaceId]/components/credential-detail' @@ -44,8 +53,6 @@ import { useWorkspacesQuery } from '@/hooks/queries/workspace' const logger = createLogger('DataRetentionSettings') -const ENTITY_LABELS = SUPPORTED_PII_ENTITIES as Record - /** Sentinel `RetentionSelect` value meaning "inherit the org-level value". */ const INHERIT = 'inherit' @@ -66,8 +73,7 @@ const DAY_OPTIONS = [ interface PiiOverride { id: string workspaceId: string - entityTypes: string[] - language: PIILanguage + stages: PiiStages } /** @@ -84,8 +90,7 @@ interface PolicyDraft { softDeleteDays: string taskCleanupDays: string piiOverride: boolean - piiEntityTypes: string[] - piiLanguage: PIILanguage + piiStages: PiiStages } interface EditingPolicy { @@ -132,6 +137,19 @@ function buildRetentionOverride(workspaceId: string, draft: PolicyDraft): Retent return hasField ? override : null } +/** Stable serialization of a stage set for dirty-detection. */ +function serializeStages(stages: PiiStages): Array<[PiiStageKey, boolean, string[], PIILanguage]> { + return PII_STAGES.map((key) => { + const policy = stages[key] + return [key, policy.enabled, [...policy.entityTypes].sort(), policy.language] as [ + PiiStageKey, + boolean, + string[], + PIILanguage, + ] + }) +} + function normalizePolicyDraft(draft: PolicyDraft): string { return JSON.stringify({ isOrgDefault: draft.isOrgDefault, @@ -140,16 +158,43 @@ function normalizePolicyDraft(draft: PolicyDraft): string { softDeleteDays: draft.softDeleteDays, taskCleanupDays: draft.taskCleanupDays, piiOverride: draft.piiOverride, - piiEntityTypes: draft.piiOverride ? [...draft.piiEntityTypes].sort() : [], - piiLanguage: draft.piiLanguage, + piiStages: draft.piiOverride ? serializeStages(draft.piiStages) : [], }) } -function entitySummary(entityTypes: string[]): string { - if (entityTypes.length === 0) return 'Not redacted' - const labels = entityTypes.map((t) => ENTITY_LABELS[t] ?? t) - if (labels.length <= 3) return labels.join(', ') - return `${labels.slice(0, 3).join(', ')} +${labels.length - 3} more` +/** A stage is "on" iff it has at least one entity type selected. */ +function stageHasContent(policy: PiiStagePolicy): boolean { + return policy.entityTypes.length > 0 +} + +function anyStageHasContent(stages: PiiStages): boolean { + return PII_STAGES.some((key) => stageHasContent(stages[key])) +} + +/** Persist-time guarantee that `enabled` mirrors "has entity types" for every stage. */ +function withSyncedEnabled(stages: PiiStages): PiiStages { + return PII_STAGES.reduce((acc, key) => { + acc[key] = { ...stages[key], enabled: stages[key].entityTypes.length > 0 } + return acc + }, {} as PiiStages) +} + +/** Prune entity selections that the chosen language has no recognizer for. */ +function pruneEntitiesForLanguage(entityTypes: string[], language: PIILanguage): string[] { + return entityTypes.filter((t) => isEntitySupportedForLanguage(t as PIIEntityType, language)) +} + +/** Row-summary fragment, e.g. "Input 3 · Outputs off · Logs 5". */ +function stageSummary(stages: PiiStages): string { + const short: Record = { + input: 'Input', + blockOutputs: 'Outputs', + logs: 'Logs', + } + return PII_STAGES.map((key) => { + const policy = stages[key] + return `${short[key]} ${stageHasContent(policy) ? policy.entityTypes.length : 'off'}` + }).join(' · ') } /** Row-summary label for a retention field driven by stored hours. */ @@ -187,22 +232,35 @@ function RetentionSelect({ value, onChange, allowInherit = false }: RetentionSel } interface EntityCheckboxGridProps { + groups: ReadonlyArray<{ + label: string + entities: ReadonlyArray<{ value: PIIEntityType; label: string }> + }> selected: string[] onChange: (entityTypes: string[]) => void + /** Optional control rendered directly beneath the search row (e.g. language). */ + belowSearch?: ReactNode } -function EntityCheckboxGrid({ selected, onChange }: EntityCheckboxGridProps) { +function EntityCheckboxGrid({ + groups: sourceGroups, + selected, + onChange, + belowSearch, +}: EntityCheckboxGridProps) { const [search, setSearch] = useState('') const query = search.trim().toLowerCase() - const groups = PII_ENTITY_GROUPS.map((group) => ({ - label: group.label, - entities: query - ? group.entities.filter( - (e) => e.label.toLowerCase().includes(query) || e.value.toLowerCase().includes(query) - ) - : group.entities, - })).filter((group) => group.entities.length > 0) + const groups = sourceGroups + .map((group) => ({ + label: group.label, + entities: query + ? group.entities.filter( + (e) => e.label.toLowerCase().includes(query) || e.value.toLowerCase().includes(query) + ) + : group.entities, + })) + .filter((group) => group.entities.length > 0) const visibleValues: string[] = groups.flatMap((g) => g.entities.map((e) => e.value)) const allVisibleSelected = @@ -234,6 +292,7 @@ function EntityCheckboxGrid({ selected, onChange }: EntityCheckboxGridProps) { {allVisibleSelected ? 'Deselect all' : 'Select all'} + {belowSearch}
{groups.map((group) => (
@@ -280,6 +339,58 @@ function PiiLanguageSelect({ value, onChange }: PiiLanguageSelectProps) { ) } +interface PiiStagePanelProps { + description: string + value: PiiStagePolicy + onChange: (next: PiiStagePolicy) => void +} + +/** + * The config body for the currently-selected redaction stage (tab panel). The + * stage is "on" purely by virtue of having entity types selected — `enabled` is + * kept in sync with that, so there is no separate toggle. + */ +function PiiStagePanel({ description, value, onChange }: PiiStagePanelProps) { + const groups = getEntityGroupsForLanguage(value.language) + + function update(entityTypes: string[], language = value.language) { + onChange({ ...value, language, entityTypes, enabled: entityTypes.length > 0 }) + } + + return ( +
+ {description} + +
+
+ Entity types + + Loose numeric recognizers (US Social Security Number, US bank account number) and Date + or time match aggressively and frequently over-redact. Enable these only where false + positives are acceptable. + +
+ update(entityTypes)} + belowSearch={ +
+ Language + + update(pruneEntitiesForLanguage(value.entityTypes, language), language) + } + /> +
+ } + /> +
+
+ ) +} + interface PolicyDetailProps { draft: PolicyDraft isNew: boolean @@ -311,6 +422,13 @@ function PolicyDetail({ }: PolicyDetailProps) { const isOrg = draft.isOrgDefault const showPiiGrid = isOrg || draft.piiOverride + const [activeStage, setActiveStage] = useState( + () => + PII_STAGE_META.find((s) => stageHasContent(draft.piiStages[s.key]))?.key ?? + PII_STAGE_META[0].key + ) + const [showRemoveConfirm, setShowRemoveConfirm] = useState(false) + const activeStageMeta = PII_STAGE_META.find((s) => s.key === activeStage) ?? PII_STAGE_META[0] const title = isOrg ? 'Organization defaults' : isNew @@ -335,7 +453,11 @@ function PolicyDetail({ saveDisabled={!isOrg && draft.workspaceIds.length === 0} /> {canRemove && ( - + setShowRemoveConfirm(true)} + disabled={isSaving} + > Remove override )} @@ -359,10 +481,11 @@ function PolicyDetail({ onChange({ ...draft, workspaceIds })} options={workspaceOptions} - placeholder='Select workspaces' className='flex-shrink-0' />
@@ -417,19 +540,32 @@ function PolicyDetail({ />
)} + {!isOrg && draft.piiOverride && ( + + Overriding replaces all three redaction stages for this workspace. + + )} {showPiiGrid && ( <> - onChange({ ...draft, piiEntityTypes })} + ({ + value: stage.key, + label: stage.label, + }))} + /> + + onChange({ + ...draft, + piiStages: { ...draft.piiStages, [activeStage]: next }, + }) + } /> -
- Language - onChange({ ...draft, piiLanguage })} - /> -
)} @@ -437,6 +573,29 @@ function PolicyDetail({ )} + + ) } @@ -480,13 +639,7 @@ export function DataRetentionSettings() { const rules = data.configured.piiRedaction?.rules ?? [] const defaultRule = rules.find((r) => r.workspaceId === null) setDefaultPii( - defaultRule - ? { - id: defaultRule.id, - entityTypes: defaultRule.entityTypes, - language: defaultRule.language ?? DEFAULT_PII_LANGUAGE, - } - : null + defaultRule ? { id: defaultRule.id, stages: normalizeRuleStages(defaultRule) } : null ) setPiiOverrides( rules @@ -494,8 +647,7 @@ export function DataRetentionSettings() { .map((r) => ({ id: r.id, workspaceId: r.workspaceId as string, - entityTypes: r.entityTypes, - language: r.language ?? DEFAULT_PII_LANGUAGE, + stages: normalizeRuleStages(r), })) ) setOverrides(data.configured.retentionOverrides ?? []) @@ -530,7 +682,9 @@ export function DataRetentionSettings() { ] if (piiEnabled) { parts.push( - defaultPii?.entityTypes.length ? `PII: ${entitySummary(defaultPii.entityTypes)}` : 'No PII' + defaultPii && anyStageHasContent(defaultPii.stages) + ? `PII: ${stageSummary(defaultPii.stages)}` + : 'No PII' ) } return parts.join(' · ') @@ -544,7 +698,7 @@ export function DataRetentionSettings() { `Soft-delete ${retentionLabel(ov?.softDeleteRetentionHours)}`, `Task ${retentionLabel(ov?.taskCleanupHours)}`, ] - if (piiEnabled) parts.push(pii ? `PII: ${entitySummary(pii.entityTypes)}` : 'PII inherited') + if (piiEnabled) parts.push(pii ? `PII: ${stageSummary(pii.stages)}` : 'PII inherited') return parts.join(' · ') } @@ -569,23 +723,17 @@ export function DataRetentionSettings() { retentionOverrides: next.overrides, } if (piiEnabled) { - const rules: { - id: string - entityTypes: string[] - workspaceId: string | null - language: PIILanguage - }[] = next.piiOverrides.map((p) => ({ - id: p.id, - entityTypes: p.entityTypes, - workspaceId: p.workspaceId, - language: p.language, - })) + const rules: { id: string; workspaceId: string | null; stages: PiiStages }[] = + next.piiOverrides.map((p) => ({ + id: p.id, + workspaceId: p.workspaceId, + stages: withSyncedEnabled(p.stages), + })) if (next.defaultPii) { rules.unshift({ id: next.defaultPii.id, - entityTypes: next.defaultPii.entityTypes, workspaceId: null, - language: next.defaultPii.language, + stages: withSyncedEnabled(next.defaultPii.stages), }) } settings.piiRedaction = { rules } @@ -613,8 +761,7 @@ export function DataRetentionSettings() { softDeleteDays, taskCleanupDays, piiOverride: true, - piiEntityTypes: defaultPii?.entityTypes ?? [], - piiLanguage: defaultPii?.language ?? DEFAULT_PII_LANGUAGE, + piiStages: defaultPii?.stages ?? emptyPiiStages(), } setEditing({ draft, original: draft, isNew: false }) } @@ -628,8 +775,7 @@ export function DataRetentionSettings() { softDeleteDays: INHERIT, taskCleanupDays: INHERIT, piiOverride: false, - piiEntityTypes: [], - piiLanguage: DEFAULT_PII_LANGUAGE, + piiStages: emptyPiiStages(), } setEditing({ draft, original: draft, isNew: true }) } @@ -644,8 +790,7 @@ export function DataRetentionSettings() { softDeleteDays: hoursToOverrideValue(ov?.softDeleteRetentionHours), taskCleanupDays: hoursToOverrideValue(ov?.taskCleanupHours), piiOverride: Boolean(pii), - piiEntityTypes: pii?.entityTypes ?? [], - piiLanguage: pii?.language ?? DEFAULT_PII_LANGUAGE, + piiStages: pii?.stages ?? emptyPiiStages(), } setEditing({ draft, original: draft, isNew: false }) } @@ -668,11 +813,10 @@ export function DataRetentionSettings() { logDays: draft.logDays, softDeleteDays: draft.softDeleteDays, taskCleanupDays: draft.taskCleanupDays, - defaultPii: draft.piiEntityTypes.length + defaultPii: anyStageHasContent(draft.piiStages) ? { id: defaultPii?.id ?? generateId(), - entityTypes: draft.piiEntityTypes, - language: draft.piiLanguage, + stages: draft.piiStages, } : null, }) @@ -694,8 +838,7 @@ export function DataRetentionSettings() { nextPiiOverrides.push({ id: existing?.id ?? generateId(), workspaceId, - entityTypes: draft.piiEntityTypes, - language: draft.piiLanguage, + stages: draft.piiStages, }) } } diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 0fdd6be1530..0b54d8d76d3 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -4,6 +4,7 @@ import { redactApiKeys } from '@/lib/core/security/redaction' import { normalizeStringArray } from '@/lib/core/utils/arrays' import { getBaseUrl } from '@/lib/core/utils/urls' import { compactExecutionPayload } from '@/lib/execution/payloads/serializer' +import { redactObjectStrings } from '@/lib/logs/execution/pii-redaction' import { containsUserFileWithMetadata, hydrateUserFilesWithBase64, @@ -219,6 +220,18 @@ export class BlockExecutor { })) as NormalizedBlockOutput } + if (ctx.piiBlockOutputRedaction?.enabled) { + // In-flight redaction: mask before compaction (so offloaded large values + // are seen) and before the log/state split below, so both the downstream + // state copy and the persisted log copy are masked. `onFailure: 'throw'` + // aborts the run rather than feeding corrupted/leaked data downstream. + normalizedOutput = await redactObjectStrings(normalizedOutput, { + entityTypes: ctx.piiBlockOutputRedaction.entityTypes, + language: ctx.piiBlockOutputRedaction.language, + onFailure: 'throw', + }) + } + normalizedOutput = (await compactExecutionPayload(normalizedOutput, { workspaceId: ctx.workspaceId, workflowId: ctx.workflowId, diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index b88d959a4d3..a551a05d57b 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -421,6 +421,7 @@ export class DAGExecutor { userId: this.contextExtensions.userId, isDeployedContext: this.contextExtensions.isDeployedContext, enforceCredentialAccess: this.contextExtensions.enforceCredentialAccess, + piiBlockOutputRedaction: this.contextExtensions.piiBlockOutputRedaction, blockStates: state.getBlockStates(), blockLogs: overrides?.runFromBlockContext ? [] : (snapshotState?.blockLogs ?? []), metadata: { diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 004a0bb8a1c..eaafa303ed0 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -148,6 +148,15 @@ export interface ExecutionCallbacks { ) => Promise } +/** In-flight block-output redaction policy (the resolved `blockOutputs` stage). */ +export interface PiiBlockOutputRedaction { + enabled: boolean + /** Presidio entity types to mask. Empty = redact all detected PII. */ + entityTypes: string[] + /** Language whose Presidio recognizers apply. */ + language: string +} + export interface ContextExtensions { workspaceId?: string executionId?: string @@ -180,6 +189,12 @@ export interface ContextExtensions { abortSignal?: AbortSignal includeFileBase64?: boolean base64MaxBytes?: number + /** + * When enabled, every block output is masked in-flight before downstream blocks + * consume it. Resolved from the org/workspace PII redaction policy's + * `blockOutputs` stage. Serializable, so it crosses into the trigger.dev worker. + */ + piiBlockOutputRedaction?: PiiBlockOutputRedaction onStream?: (streamingExecution: StreamingExecution) => Promise onBlockStart?: ( blockId: string, diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 3e9ef6a6e40..d9120567aff 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -5,6 +5,7 @@ import type { ChildWorkflowContext, IterationContext, ParentIteration, + PiiBlockOutputRedaction, SerializableExecutionState, } from '@/executor/execution/types' import type { RunFromBlockContext } from '@/executor/utils/run-from-block' @@ -306,6 +307,8 @@ export interface ExecutionContext { isDeployedContext?: boolean enforceCredentialAccess?: boolean copilotToolExecution?: boolean + /** In-flight block-output PII redaction policy (resolved `blockOutputs` stage). */ + piiBlockOutputRedaction?: PiiBlockOutputRedaction permissionConfig?: PermissionGroupConfig | null permissionConfigLoaded?: boolean diff --git a/apps/sim/lib/api/contracts/data-retention.test.ts b/apps/sim/lib/api/contracts/data-retention.test.ts index 59e18b25b99..3efd542be49 100644 --- a/apps/sim/lib/api/contracts/data-retention.test.ts +++ b/apps/sim/lib/api/contracts/data-retention.test.ts @@ -3,7 +3,11 @@ */ import { describe, expect, it } from 'vitest' import { updateOrganizationDataRetentionBodySchema } from '@/lib/api/contracts/organization' -import { retentionOverridesSchema } from '@/lib/api/contracts/primitives' +import { + piiRedactionRuleSchema, + piiRedactionSettingsSchema, + retentionOverridesSchema, +} from '@/lib/api/contracts/primitives' describe('retentionOverridesSchema', () => { it('accepts an override that overrides one field and inherits the rest', () => { @@ -67,3 +71,61 @@ describe('updateOrganizationDataRetentionBodySchema', () => { expect(result.success).toBe(true) }) }) + +describe('piiRedactionRuleSchema', () => { + const stage = (enabled: boolean, entityTypes: string[], language?: string) => ({ + enabled, + entityTypes, + ...(language ? { language } : {}), + }) + + it('accepts a legacy flat rule (entityTypes only)', () => { + const result = piiRedactionRuleSchema.safeParse({ + id: 'r-1', + workspaceId: null, + entityTypes: ['EMAIL_ADDRESS'], + }) + expect(result.success).toBe(true) + }) + + it('accepts a per-stage rule', () => { + const result = piiRedactionRuleSchema.safeParse({ + id: 'r-1', + workspaceId: 'ws-1', + stages: { + input: stage(true, ['PERSON'], 'es'), + blockOutputs: stage(false, []), + logs: stage(true, ['US_SSN']), + }, + }) + expect(result.success).toBe(true) + }) + + it('rejects a rule with neither stages nor entityTypes', () => { + const result = piiRedactionRuleSchema.safeParse({ id: 'r-1', workspaceId: null }) + expect(result.success).toBe(false) + }) + + it('rejects an unsupported stage language', () => { + const result = piiRedactionRuleSchema.safeParse({ + id: 'r-1', + workspaceId: null, + stages: { + input: stage(true, ['PERSON'], 'de'), + blockOutputs: stage(false, []), + logs: stage(false, []), + }, + }) + expect(result.success).toBe(false) + }) + + it('enforces one rule per scope (uniqueness refine still applies)', () => { + const result = piiRedactionSettingsSchema.safeParse({ + rules: [ + { id: 'r-1', workspaceId: 'ws-1', entityTypes: ['PERSON'] }, + { id: 'r-2', workspaceId: 'ws-1', entityTypes: ['US_SSN'] }, + ], + }) + expect(result.success).toBe(false) + }) +}) diff --git a/apps/sim/lib/api/contracts/primitives.ts b/apps/sim/lib/api/contracts/primitives.ts index 7e73175abc3..f45571518a6 100644 --- a/apps/sim/lib/api/contracts/primitives.ts +++ b/apps/sim/lib/api/contracts/primitives.ts @@ -113,18 +113,47 @@ export const userFileSchema = z }) .passthrough() -/** A single PII redaction rule targeting one scope (all workspaces, or one). */ -export const piiRedactionRuleSchema = z.object({ - id: z.string().min(1), - name: z.string().max(100).optional(), - /** Presidio entity types to mask. Empty = redact nothing for this scope. */ +/** Per-stage redaction policy: which entity types to mask, in which language. */ +export const piiStagePolicySchema = z.object({ + enabled: z.boolean(), + /** Presidio entity types to mask. Empty (or disabled) = redact nothing. */ entityTypes: z.array(z.string().min(1, 'Entity type cannot be empty')).max(100), - /** null = all workspaces; otherwise the single targeted workspace. */ - workspaceId: z.string().min(1).nullable(), /** Language whose Presidio recognizers apply; defaults to English. */ language: z.enum(PII_LANGUAGE_CODES).optional(), }) +export type PiiStagePolicy = z.output + +/** The three redaction stages, each independently configured. */ +export const piiStagesSchema = z.object({ + input: piiStagePolicySchema, + blockOutputs: piiStagePolicySchema, + logs: piiStagePolicySchema, +}) + +export type PiiStages = z.output + +/** + * A single PII redaction rule targeting one scope (all workspaces, or one). + * New rules carry per-stage `stages`; legacy rows carry only the flat + * `entityTypes`/`language` (resolved as logs-only). At least one must be present. + */ +export const piiRedactionRuleSchema = z + .object({ + id: z.string().min(1), + name: z.string().max(100).optional(), + /** null = all workspaces; otherwise the single targeted workspace. */ + workspaceId: z.string().min(1).nullable(), + /** Per-stage policy (input / blockOutputs / logs). */ + stages: piiStagesSchema.optional(), + /** Legacy flat policy (pre-stages). Retained for back-compat parse + migration. */ + entityTypes: z.array(z.string().min(1, 'Entity type cannot be empty')).max(100).optional(), + language: z.enum(PII_LANGUAGE_CODES).optional(), + }) + .refine((rule) => rule.stages !== undefined || rule.entityTypes !== undefined, { + message: 'A PII redaction rule must define either stages or entityTypes.', + }) + export type PiiRedactionRule = z.output /** diff --git a/apps/sim/lib/billing/retention.test.ts b/apps/sim/lib/billing/retention.test.ts index 60f5a7f83c9..df7a056e691 100644 --- a/apps/sim/lib/billing/retention.test.ts +++ b/apps/sim/lib/billing/retention.test.ts @@ -4,6 +4,7 @@ import type { DataRetentionSettings, PiiRedactionRule } from '@sim/db/schema' import { describe, expect, it } from 'vitest' import { + DEFAULT_PII_REDACTION, resolveEffectivePiiRedaction, resolveEffectiveRetentionHours, } from '@/lib/billing/retention' @@ -12,6 +13,8 @@ function settings(rules: PiiRedactionRule[]): DataRetentionSettings { return { piiRedaction: { rules } } } +const DISABLED = { enabled: false, entityTypes: [], language: 'en' } + describe('resolveEffectivePiiRedaction', () => { const allRule: PiiRedactionRule = { id: 'r-all', @@ -19,71 +22,150 @@ describe('resolveEffectivePiiRedaction', () => { workspaceId: null, } - it('applies the all-workspaces rule when the workspace has no specific rule', () => { - const result = resolveEffectivePiiRedaction({ - orgSettings: settings([allRule]), - workspaceId: 'ws-1', + describe('legacy flat rules (back-compat)', () => { + it('resolves an all-workspaces flat rule to logs-only', () => { + const result = resolveEffectivePiiRedaction({ + orgSettings: settings([allRule]), + workspaceId: 'ws-1', + }) + expect(result).toEqual({ + input: DISABLED, + blockOutputs: DISABLED, + logs: { enabled: true, entityTypes: ['EMAIL_ADDRESS', 'PHONE_NUMBER'], language: 'en' }, + }) }) - expect(result).toEqual({ - enabled: true, - entityTypes: ['EMAIL_ADDRESS', 'PHONE_NUMBER'], - language: 'en', + + it('lets a workspace-specific flat rule override the all rule (logs-only)', () => { + const result = resolveEffectivePiiRedaction({ + orgSettings: settings([ + allRule, + { id: 'r-1', entityTypes: ['US_SSN'], workspaceId: 'ws-1' }, + ]), + workspaceId: 'ws-1', + }) + expect(result).toEqual({ + input: DISABLED, + blockOutputs: DISABLED, + logs: { enabled: true, entityTypes: ['US_SSN'], language: 'en' }, + }) }) - }) - it('lets a workspace-specific rule override the all rule', () => { - const result = resolveEffectivePiiRedaction({ - orgSettings: settings([allRule, { id: 'r-1', entityTypes: ['US_SSN'], workspaceId: 'ws-1' }]), - workspaceId: 'ws-1', + it('carries the flat rule language through (defaults to en)', () => { + const result = resolveEffectivePiiRedaction({ + orgSettings: settings([ + { id: 'r-es', entityTypes: ['ES_NIF'], workspaceId: 'ws-1', language: 'es' }, + ]), + workspaceId: 'ws-1', + }) + expect(result.logs).toEqual({ enabled: true, entityTypes: ['ES_NIF'], language: 'es' }) }) - expect(result).toEqual({ enabled: true, entityTypes: ['US_SSN'], language: 'en' }) - }) - it('carries the rule language through (defaults to en)', () => { - const result = resolveEffectivePiiRedaction({ - orgSettings: settings([ - { id: 'r-es', entityTypes: ['ES_NIF'], workspaceId: 'ws-1', language: 'es' }, - ]), - workspaceId: 'ws-1', + it('falls back to en when a stored language is unsupported/stale', () => { + const result = resolveEffectivePiiRedaction({ + orgSettings: settings([ + { id: 'r-de', entityTypes: ['EMAIL_ADDRESS'], workspaceId: 'ws-1', language: 'de' }, + ]), + workspaceId: 'ws-1', + }) + expect(result.logs).toEqual({ enabled: true, entityTypes: ['EMAIL_ADDRESS'], language: 'en' }) }) - expect(result).toEqual({ enabled: true, entityTypes: ['ES_NIF'], language: 'es' }) - }) - it('falls back to en when a stored language is unsupported/stale', () => { - const result = resolveEffectivePiiRedaction({ - orgSettings: settings([ - { id: 'r-de', entityTypes: ['EMAIL_ADDRESS'], workspaceId: 'ws-1', language: 'de' }, - ]), - workspaceId: 'ws-1', + it('exempts a workspace when its specific flat rule has no entity types', () => { + const result = resolveEffectivePiiRedaction({ + orgSettings: settings([allRule, { id: 'r-1', entityTypes: [], workspaceId: 'ws-1' }]), + workspaceId: 'ws-1', + }) + expect(result).toEqual(DEFAULT_PII_REDACTION) }) - expect(result).toEqual({ enabled: true, entityTypes: ['EMAIL_ADDRESS'], language: 'en' }) }) - it('exempts a workspace when its specific rule has no entity types', () => { - const result = resolveEffectivePiiRedaction({ - orgSettings: settings([allRule, { id: 'r-1', entityTypes: [], workspaceId: 'ws-1' }]), - workspaceId: 'ws-1', + describe('per-stage rules', () => { + const stage = (enabled: boolean, entityTypes: string[], language?: string) => ({ + enabled, + entityTypes, + ...(language ? { language } : {}), }) - expect(result).toEqual({ enabled: false, entityTypes: [], language: 'en' }) - }) - it('is disabled when no rule matches and there is no all rule', () => { - const result = resolveEffectivePiiRedaction({ - orgSettings: settings([{ id: 'r-1', entityTypes: ['US_SSN'], workspaceId: 'ws-2' }]), - workspaceId: 'ws-1', + it('resolves each stage independently', () => { + const result = resolveEffectivePiiRedaction({ + orgSettings: settings([ + { + id: 'r-1', + workspaceId: 'ws-1', + stages: { + input: stage(true, ['PERSON'], 'es'), + blockOutputs: stage(true, ['EMAIL_ADDRESS']), + logs: stage(true, ['US_SSN', 'PHONE_NUMBER']), + }, + }, + ]), + workspaceId: 'ws-1', + }) + expect(result).toEqual({ + input: { enabled: true, entityTypes: ['PERSON'], language: 'es' }, + blockOutputs: { enabled: true, entityTypes: ['EMAIL_ADDRESS'], language: 'en' }, + logs: { enabled: true, entityTypes: ['US_SSN', 'PHONE_NUMBER'], language: 'en' }, + }) }) - expect(result).toEqual({ enabled: false, entityTypes: [], language: 'en' }) + + it('disables a stage that is enabled but has no entity types', () => { + const result = resolveEffectivePiiRedaction({ + orgSettings: settings([ + { + id: 'r-1', + workspaceId: 'ws-1', + stages: { + input: stage(true, []), + blockOutputs: stage(false, ['PERSON']), + logs: stage(true, ['PERSON']), + }, + }, + ]), + workspaceId: 'ws-1', + }) + expect(result.input).toEqual(DISABLED) + expect(result.blockOutputs).toEqual(DISABLED) + expect(result.logs).toEqual({ enabled: true, entityTypes: ['PERSON'], language: 'en' }) + }) + + it('selects the whole workspace rule over the all rule (no per-stage merge)', () => { + const result = resolveEffectivePiiRedaction({ + orgSettings: settings([ + allRule, + { + id: 'r-ws', + workspaceId: 'ws-1', + stages: { + input: stage(true, ['PERSON']), + blockOutputs: stage(false, []), + logs: stage(false, []), + }, + }, + ]), + workspaceId: 'ws-1', + }) + expect(result.input).toEqual({ enabled: true, entityTypes: ['PERSON'], language: 'en' }) + // The all rule's logs entity types are NOT unioned in. + expect(result.logs).toEqual(DISABLED) + }) + }) + + it('is the default when no rule matches and there is no all rule', () => { + expect( + resolveEffectivePiiRedaction({ + orgSettings: settings([{ id: 'r-1', entityTypes: ['US_SSN'], workspaceId: 'ws-2' }]), + workspaceId: 'ws-1', + }) + ).toEqual(DEFAULT_PII_REDACTION) }) - it('is disabled when there are no rules', () => { + it('is the default when there are no rules', () => { expect( resolveEffectivePiiRedaction({ orgSettings: settings([]), workspaceId: 'ws-1' }) - ).toEqual({ enabled: false, entityTypes: [], language: 'en' }) - expect(resolveEffectivePiiRedaction({ orgSettings: null, workspaceId: 'ws-1' })).toEqual({ - enabled: false, - entityTypes: [], - language: 'en', - }) + ).toEqual(DEFAULT_PII_REDACTION) + expect(resolveEffectivePiiRedaction({ orgSettings: null, workspaceId: 'ws-1' })).toEqual( + DEFAULT_PII_REDACTION + ) }) }) diff --git a/apps/sim/lib/billing/retention.ts b/apps/sim/lib/billing/retention.ts index f22b38be705..994088461d5 100644 --- a/apps/sim/lib/billing/retention.ts +++ b/apps/sim/lib/billing/retention.ts @@ -1,7 +1,8 @@ -import type { DataRetentionSettings } from '@sim/db/schema' +import type { DataRetentionSettings, PiiStagePolicy } from '@sim/db/schema' import { coercePiiLanguage, DEFAULT_PII_LANGUAGE } from '@/lib/guardrails/pii-entities' -export interface EffectivePiiRedaction { +/** Resolved policy for one redaction stage. */ +export interface EffectivePiiStage { enabled: boolean /** Presidio entity types to mask. Empty = redact all detected PII. */ entityTypes: string[] @@ -9,18 +10,53 @@ export interface EffectivePiiRedaction { language: string } -export const DEFAULT_PII_REDACTION: EffectivePiiRedaction = { +/** + * Effective PII redaction, resolved per stage. `input`/`blockOutputs` are + * execution-altering (mask the data the workflow computes on); `logs` is the + * observability-only persist-time stage. + */ +export interface EffectivePiiRedaction { + input: EffectivePiiStage + blockOutputs: EffectivePiiStage + logs: EffectivePiiStage +} + +const DISABLED_STAGE: EffectivePiiStage = { enabled: false, entityTypes: [], language: DEFAULT_PII_LANGUAGE, } +export const DEFAULT_PII_REDACTION: EffectivePiiRedaction = { + input: DISABLED_STAGE, + blockOutputs: DISABLED_STAGE, + logs: DISABLED_STAGE, +} + +function sanitizeEntityTypes(value: unknown): string[] { + return Array.isArray(value) ? value.filter((t): t is string => typeof t === 'string') : [] +} + +/** A stage redacts nothing unless it is enabled AND has at least one entity type. */ +function toEffectiveStage(policy: PiiStagePolicy | undefined): EffectivePiiStage { + const types = sanitizeEntityTypes(policy?.entityTypes) + if (!policy?.enabled || types.length === 0) return DISABLED_STAGE + return { + enabled: true, + entityTypes: types, + language: coercePiiLanguage(policy.language) ?? DEFAULT_PII_LANGUAGE, + } +} + /** - * Resolve the effective PII redaction policy for a workspace from the org-level - * rules list, most-specific-wins (never unioned): the workspace's own rule takes - * precedence over the all-workspaces rule (`workspaceId: null`). A resolved rule - * with no entity types redacts nothing — so a workspace-specific empty rule - * exempts that workspace, overriding the all rule. Defensive about the + * Resolve the effective per-stage PII redaction policy for a workspace from the + * org-level rules list, most-specific-wins (never unioned): the workspace's own + * rule takes precedence over the all-workspaces rule (`workspaceId: null`). Rule + * selection is whole-rule; the selected rule is then expanded into three stages. + * + * Back-compat: a legacy rule with no `stages` is treated exactly as it was before + * — logs-only, masking its flat `entityTypes` (input/blockOutputs disabled). A + * resolved stage with no entity types redacts nothing. Defensive about the * loosely-typed JSON column. */ export function resolveEffectivePiiRedaction(params: { @@ -33,13 +69,27 @@ export function resolveEffectivePiiRedaction(params: { const rule = rules.find((r) => r?.workspaceId === params.workspaceId) ?? rules.find((r) => r?.workspaceId == null) + if (!rule) return DEFAULT_PII_REDACTION + + if (!rule.stages) { + const types = sanitizeEntityTypes(rule.entityTypes) + if (types.length === 0) return DEFAULT_PII_REDACTION + return { + input: DISABLED_STAGE, + blockOutputs: DISABLED_STAGE, + logs: { + enabled: true, + entityTypes: types, + language: coercePiiLanguage(rule.language) ?? DEFAULT_PII_LANGUAGE, + }, + } + } - const types = Array.isArray(rule?.entityTypes) - ? rule.entityTypes.filter((t): t is string => typeof t === 'string') - : [] - if (types.length === 0) return DEFAULT_PII_REDACTION - const language = coercePiiLanguage(rule?.language) ?? DEFAULT_PII_LANGUAGE - return { enabled: true, entityTypes: types, language } + return { + input: toEffectiveStage(rule.stages.input), + blockOutputs: toEffectiveStage(rule.stages.blockOutputs), + logs: toEffectiveStage(rule.stages.logs), + } } export type RetentionHoursKey = diff --git a/apps/sim/lib/guardrails/mask-client.ts b/apps/sim/lib/guardrails/mask-client.ts index 3fb818a3c72..24b7f5bfa07 100644 --- a/apps/sim/lib/guardrails/mask-client.ts +++ b/apps/sim/lib/guardrails/mask-client.ts @@ -1,15 +1,8 @@ import type { GuardrailsMaskBatchResult } from '@/lib/api/contracts' import { generateInternalToken } from '@/lib/auth/internal' import { getInternalApiBaseUrl } from '@/lib/core/utils/urls' +import { chunkIndicesByBudget } from '@/lib/guardrails/pii-batching' -/** - * Per-request limits. A chunk is flushed when it hits either bound, keeping each - * request small enough for one short Presidio pass under a tight timeout and far - * below the contract's 100k-entry cap — so large executions split across - * requests instead of failing validation. - */ -const REQUEST_MAX_BYTES = 256 * 1024 -const REQUEST_MAX_COUNT = 2_000 /** Bounds one mask-batch request; an unreachable/stuck Presidio sidecar aborts so the caller scrubs. */ const REQUEST_TIMEOUT_MS = 45_000 @@ -19,8 +12,9 @@ const REQUEST_TIMEOUT_MS = 45_000 * The Presidio sidecars run only in the app task, but the log-redaction persist * path also runs inside the trigger.dev runtime — so redaction always routes * through HTTP, the same way the guardrails tool does. - * Strings are grouped into byte/count-budgeted chunks; order is preserved, so - * the returned array matches `texts` length. + * Strings are grouped into byte/count-budgeted chunks (keeping each request far + * under the 10MB Next body limit); order is preserved, so the returned array + * matches `texts` length. * * Rejects on any non-2xx, timeout, or shape mismatch so the caller can apply * its own fail-safe (scrubbing rather than leaking). @@ -33,34 +27,18 @@ export async function maskPIIBatchViaHttp( if (texts.length === 0) return [] const url = `${getInternalApiBaseUrl()}/api/guardrails/mask-batch` + const masked = new Array(texts.length) - const masked: string[] = [] - let batch: string[] = [] - let batchBytes = 0 - - const flush = async () => { - if (batch.length === 0) return - const out = await postChunk(url, batch, entityTypes, language) - if (out.length !== batch.length) { + for (const indices of chunkIndicesByBudget(texts)) { + const chunk = indices.map((i) => texts[i]) + const out = await postChunk(url, chunk, entityTypes, language) + if (out.length !== chunk.length) { throw new Error('PII mask-batch returned an unexpected result') } - for (const item of out) masked.push(item) - batch = [] - batchBytes = 0 - } - - for (const text of texts) { - const bytes = Buffer.byteLength(text, 'utf8') - if ( - batch.length > 0 && - (batch.length >= REQUEST_MAX_COUNT || batchBytes + bytes > REQUEST_MAX_BYTES) - ) { - await flush() - } - batch.push(text) - batchBytes += bytes + indices.forEach((originalIndex, k) => { + masked[originalIndex] = out[k] + }) } - await flush() return masked } diff --git a/apps/sim/lib/guardrails/pii-batching.ts b/apps/sim/lib/guardrails/pii-batching.ts new file mode 100644 index 00000000000..2eccdc2039c --- /dev/null +++ b/apps/sim/lib/guardrails/pii-batching.ts @@ -0,0 +1,41 @@ +/** + * Per-request bounds shared by both Presidio hops: the app→route HTTP call + * (`mask-client`) and the route→sidecar call (`validate_pii`). Keeping a single + * source of truth ensures every request stays far under the 10MB Next body limit + * and small enough for one short spaCy NER pass under the sidecar timeout. + */ + +/** Max UTF-8 bytes of text per Presidio request. ~40× under the 10MB Next limit. */ +export const PII_REQUEST_MAX_BYTES = 256 * 1024 +/** Max strings per request; caps per-item overhead and stays well under the contract's 100k-entry cap. */ +export const PII_REQUEST_MAX_COUNT = 2_000 + +/** + * Group `texts` into chunks of original indices, flushing a chunk when adding the + * next string would exceed {@link PII_REQUEST_MAX_BYTES} or {@link PII_REQUEST_MAX_COUNT}. + * A single string larger than the byte budget still gets its own chunk — strings + * are never dropped, since an unredacted leaf would persist PII. Order is preserved + * across and within chunks. + */ +export function chunkIndicesByBudget(texts: string[]): number[][] { + const chunks: number[][] = [] + let current: number[] = [] + let bytes = 0 + + for (let i = 0; i < texts.length; i++) { + const size = Buffer.byteLength(texts[i], 'utf8') + if ( + current.length > 0 && + (current.length >= PII_REQUEST_MAX_COUNT || bytes + size > PII_REQUEST_MAX_BYTES) + ) { + chunks.push(current) + current = [] + bytes = 0 + } + current.push(i) + bytes += size + } + if (current.length > 0) chunks.push(current) + + return chunks +} diff --git a/apps/sim/lib/guardrails/pii-entities.ts b/apps/sim/lib/guardrails/pii-entities.ts index c26e7dc0b91..9fd57eee8a8 100644 --- a/apps/sim/lib/guardrails/pii-entities.ts +++ b/apps/sim/lib/guardrails/pii-entities.ts @@ -156,3 +156,186 @@ export function coercePiiLanguage(value: string | undefined): PIILanguage | unde ? (value as PIILanguage) : undefined } + +/** + * Entity types every served language recognizes: Presidio's global pattern + * recognizers, the spaCy NER entities (PERSON/LOCATION/NRP), and the native VIN + * recognizer (registered under every language in `apps/pii/server.py`). + */ +const GLOBAL_PII_ENTITIES: readonly PIIEntityType[] = [ + 'PERSON', + 'LOCATION', + 'NRP', + 'CREDIT_CARD', + 'CRYPTO', + 'DATE_TIME', + 'EMAIL_ADDRESS', + 'IBAN_CODE', + 'IP_ADDRESS', + 'PHONE_NUMBER', + 'URL', + 'MEDICAL_LICENSE', + 'VIN', +] + +/** + * Entity types each language recognizes, mirroring the recognizer registration in + * `apps/pii/server.py`: globals + NER + VIN everywhere, plus the locale-specific + * id recognizers under the language they're registered for (US/UK/AU/IN/SG ids + * are English; es/it/pl/fi carry only their own national ids). Keep in sync with + * the image — a stale entry only no-ops (redaction fails safe), it never leaks. + * `/supportedentities` is the authoritative source if this ever needs to go live. + */ +export const PII_ENTITIES_BY_LANGUAGE: Record> = { + en: new Set([ + ...GLOBAL_PII_ENTITIES, + 'US_SSN', + 'US_PASSPORT', + 'US_DRIVER_LICENSE', + 'US_BANK_NUMBER', + 'US_ITIN', + 'UK_NHS', + 'UK_NINO', + 'AU_ABN', + 'AU_ACN', + 'AU_TFN', + 'AU_MEDICARE', + 'IN_PAN', + 'IN_AADHAAR', + 'IN_VEHICLE_REGISTRATION', + 'IN_VOTER', + 'IN_PASSPORT', + 'SG_NRIC_FIN', + 'SG_UEN', + ]), + es: new Set([...GLOBAL_PII_ENTITIES, 'ES_NIF', 'ES_NIE']), + it: new Set([ + ...GLOBAL_PII_ENTITIES, + 'IT_FISCAL_CODE', + 'IT_DRIVER_LICENSE', + 'IT_VAT_CODE', + 'IT_PASSPORT', + 'IT_IDENTITY_CARD', + ]), + pl: new Set([...GLOBAL_PII_ENTITIES, 'PL_PESEL']), + fi: new Set([...GLOBAL_PII_ENTITIES, 'FI_PERSONAL_IDENTITY_CODE']), +} + +/** True when the entity has a recognizer for the given language. */ +export function isEntitySupportedForLanguage( + entity: PIIEntityType, + language: PIILanguage +): boolean { + return PII_ENTITIES_BY_LANGUAGE[language].has(entity) +} + +/** {@link PII_ENTITY_GROUPS} filtered to entities the language recognizes (empty groups dropped). */ +export function getEntityGroupsForLanguage(language: PIILanguage) { + return PII_ENTITY_GROUPS.map((group) => ({ + label: group.label, + entities: group.entities.filter((e) => isEntitySupportedForLanguage(e.value, language)), + })).filter((group) => group.entities.length > 0) +} + +/** The PII redaction stages, in execution order. */ +export const PII_STAGES = ['input', 'blockOutputs', 'logs'] as const +export type PiiStageKey = (typeof PII_STAGES)[number] + +/** Per-stage redaction policy. `enabled: false` makes the stage a no-op. */ +export interface PiiStagePolicy { + enabled: boolean + entityTypes: string[] + language: PIILanguage +} + +export type PiiStages = Record + +/** + * Stage catalog driving the settings UI, in display order (Logs first — the + * safe, observability-only default). The execution-altering caveat for the + * input/blockOutputs stages is folded into their descriptions. + */ +export const PII_STAGE_META: ReadonlyArray<{ + key: PiiStageKey + label: string + description: string +}> = [ + { + key: 'logs', + label: 'Logs', + description: 'Redact workflow logs when they are persisted.', + }, + { + key: 'input', + label: 'Workflow input', + description: + 'Redact the workflow input before execution. Data is redacted during runtime and may affect workflow output.', + }, + { + key: 'blockOutputs', + label: 'Block outputs', + description: + 'Mask every block output before the next block reads it. Data is redacted during runtime and may affect workflow output and execution performance.', + }, +] + +/** Recognizers that over-redact (loose, no checksum); surfaced as UI guidance. */ +export const RISKY_PII_ENTITIES: ReadonlySet = new Set([ + 'US_SSN', + 'US_BANK_NUMBER', + 'DATE_TIME', +]) + +/** A fully-disabled stage policy for new drafts. */ +export function emptyStagePolicy(): PiiStagePolicy { + return { enabled: false, entityTypes: [], language: DEFAULT_PII_LANGUAGE } +} + +/** A fully-disabled stage set for new drafts. */ +export function emptyPiiStages(): PiiStages { + return { + input: emptyStagePolicy(), + blockOutputs: emptyStagePolicy(), + logs: emptyStagePolicy(), + } +} + +/** + * Hydrate a stored rule into the per-stage shape. A legacy flat rule (no + * `stages`) becomes `logs` enabled with its entity types, the two new stages + * disabled — exactly its pre-stages behavior. + */ +export function normalizeRuleStages(rule: { + stages?: Partial | undefined>> + entityTypes?: string[] + language?: string +}): PiiStages { + const sanitize = (policy: Partial | undefined): PiiStagePolicy => ({ + enabled: Boolean(policy?.enabled), + entityTypes: Array.isArray(policy?.entityTypes) + ? policy.entityTypes.filter((t): t is string => typeof t === 'string') + : [], + language: coercePiiLanguage(policy?.language) ?? DEFAULT_PII_LANGUAGE, + }) + + if (rule.stages) { + return { + input: sanitize(rule.stages.input), + blockOutputs: sanitize(rule.stages.blockOutputs), + logs: sanitize(rule.stages.logs), + } + } + + const entityTypes = Array.isArray(rule.entityTypes) + ? rule.entityTypes.filter((t): t is string => typeof t === 'string') + : [] + return { + input: emptyStagePolicy(), + blockOutputs: emptyStagePolicy(), + logs: { + enabled: entityTypes.length > 0, + entityTypes, + language: coercePiiLanguage(rule.language) ?? DEFAULT_PII_LANGUAGE, + }, + } +} diff --git a/apps/sim/lib/guardrails/validate_pii.test.ts b/apps/sim/lib/guardrails/validate_pii.test.ts index 0ba1c585bc0..c4869fda322 100644 --- a/apps/sim/lib/guardrails/validate_pii.test.ts +++ b/apps/sim/lib/guardrails/validate_pii.test.ts @@ -35,6 +35,19 @@ describe('validate_pii (Presidio sidecar)', () => { analyzeBodies = [] fetchMock = vi.fn(async (url: string, init: { body: string }) => { const body = JSON.parse(init.body) + if (url.includes('/analyze_batch')) { + for (const text of body.texts as string[]) { + analyzeBodies.push({ text, language: body.language, entities: body.entities }) + } + const spans = (body.texts as string[]).map((t) => emailSpans(t, body.entities)) + return new Response(JSON.stringify(spans), { status: 200 }) + } + if (url.includes('/anonymize_batch')) { + const texts = (body.items as Array<{ text: string; analyzer_results: Span[] }>).map((i) => + applyReplace(i.text, i.analyzer_results) + ) + return new Response(JSON.stringify({ texts }), { status: 200 }) + } if (url.includes('/analyze')) { analyzeBodies.push({ text: body.text, language: body.language, entities: body.entities }) return new Response(JSON.stringify(emailSpans(body.text, body.entities)), { status: 200 }) diff --git a/apps/sim/lib/guardrails/validate_pii.ts b/apps/sim/lib/guardrails/validate_pii.ts index a24c8f880e1..2ed9ac9a50e 100644 --- a/apps/sim/lib/guardrails/validate_pii.ts +++ b/apps/sim/lib/guardrails/validate_pii.ts @@ -2,14 +2,20 @@ import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' import { env } from '@/lib/core/config/env' import { mapWithConcurrency } from '@/lib/core/utils/concurrency' +import { chunkIndicesByBudget } from '@/lib/guardrails/pii-batching' const logger = createLogger('PIIValidator') /** Just above the analyzer's spaCy NER budget so a stuck sidecar aborts gracefully. */ const REQUEST_TIMEOUT_MS = 45_000 -/** Concurrent per-string sidecar calls within one batch; the warm model handles parallelism. */ -const MASK_CONCURRENCY = 8 +/** + * Concurrent chunk requests in flight. Each chunk is itself a batched sidecar call + * (spaCy `nlp.pipe` over many strings), so a small concurrency keeps the single-model + * sidecar from holding too many parallel docs in memory while still overlapping + * HTTP/JSON with the next chunk's NER. + */ +const CHUNK_CONCURRENCY = 4 /** Single Presidio sidecar serving both /analyze and /anonymize (VIN is native there). */ const PII_URL = env.PII_URL || 'http://localhost:5001' @@ -69,6 +75,61 @@ async function analyze( return (await response.json()) as AnalyzerSpan[] } +/** + * Detect PII spans for many texts in a single analyzer pass (spaCy `nlp.pipe`), + * the batched counterpart to {@link analyze}. Returns one span array per input, + * in order. An empty `entityTypes` ⇒ detect all. Throws on transport/HTTP failure. + */ +async function analyzeBatch( + texts: string[], + entityTypes: string[], + language: string +): Promise { + const entities = entityTypes.length > 0 ? entityTypes : undefined + + // boundary-raw-fetch: internal call to the Presidio analyzer sidecar over localhost + const response = await fetch(`${PII_URL}/analyze_batch`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ texts, language, ...(entities ? { entities } : {}) }), + signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS), + }) + if (!response.ok) { + const detail = await response.text().catch(() => '') + throw new Error(`Presidio analyze failed (${response.status}): ${detail.slice(0, 200)}`) + } + return (await response.json()) as AnalyzerSpan[][] +} + +interface AnonymizeBatchItem { + text: string + analyzer_results: AnalyzerSpan[] +} + +/** + * Mask many texts in a single anonymizer pass, the batched counterpart to + * {@link anonymize}. Each item carries its own detected spans; callers must omit + * items with no spans (those texts pass through unchanged). Returns masked text + * per item, in order. Throws on failure. + */ +async function anonymizeBatch(items: AnonymizeBatchItem[]): Promise { + if (items.length === 0) return [] + + // boundary-raw-fetch: internal call to the Presidio anonymizer sidecar over localhost + const response = await fetch(`${PII_URL}/anonymize_batch`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ items }), + signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS), + }) + if (!response.ok) { + const detail = await response.text().catch(() => '') + throw new Error(`Presidio anonymize failed (${response.status}): ${detail.slice(0, 200)}`) + } + const data = (await response.json()) as { texts: string[] } + return data.texts +} + /** * Mask spans via the Presidio anonymizer sidecar. Omitting `anonymizers` uses the * default `replace` operator, which yields ``. Throws on failure. @@ -156,12 +217,14 @@ export async function validatePII(input: PIIValidationInput): Promise { if (texts.length === 0) return [] - return mapWithConcurrency(texts, MASK_CONCURRENCY, async (text) => { - if (!text) return text - const spans = await analyze(text, entityTypes, language) - return anonymize(text, spans) + const result = new Array(texts.length) + + await mapWithConcurrency(chunkIndicesByBudget(texts), CHUNK_CONCURRENCY, async (indices) => { + const chunkTexts = indices.map((i) => texts[i]) + const spansPerText = await analyzeBatch(chunkTexts, entityTypes, language) + + const toAnonymize: AnonymizeBatchItem[] = [] + const anonymizePositions: number[] = [] + indices.forEach((originalIndex, pos) => { + const spans = spansPerText[pos] ?? [] + if (spans.length === 0) { + result[originalIndex] = chunkTexts[pos] + return + } + toAnonymize.push({ text: chunkTexts[pos], analyzer_results: spans }) + anonymizePositions.push(pos) + }) + + const masked = await anonymizeBatch(toAnonymize) + anonymizePositions.forEach((pos, k) => { + result[indices[pos]] = masked[k] + }) }) + + return result } export { type PIIEntityType, SUPPORTED_PII_ENTITIES } from '@/lib/guardrails/pii-entities' diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index a78225d42f5..435d0ce70d8 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -637,7 +637,7 @@ export class ExecutionLogger implements IExecutionLoggerService { // `isWorkspaceOnEnterprisePlan` here: it returns false on transient lookup // errors, which would silently skip masking and leak PII (fail-open). When // rules are present we always redact (fail-safe; over-redaction at worst). - const config = resolveEffectivePiiRedaction({ orgSettings: row.orgSettings, workspaceId }) + const config = resolveEffectivePiiRedaction({ orgSettings: row.orgSettings, workspaceId }).logs if (!config.enabled) return payload return redactPIIFromExecution(payload, { diff --git a/apps/sim/lib/logs/execution/pii-redaction.test.ts b/apps/sim/lib/logs/execution/pii-redaction.test.ts index 5a2da7a5996..a6a940b6526 100644 --- a/apps/sim/lib/logs/execution/pii-redaction.test.ts +++ b/apps/sim/lib/logs/execution/pii-redaction.test.ts @@ -11,7 +11,12 @@ vi.mock('@/lib/guardrails/mask-client', () => ({ maskPIIBatchViaHttp: mockMaskPIIBatch, })) -import { REDACTION_FAILED_MARKER, redactPIIFromExecution } from '@/lib/logs/execution/pii-redaction' +import { + PiiRedactionError, + REDACTION_FAILED_MARKER, + redactObjectStrings, + redactPIIFromExecution, +} from '@/lib/logs/execution/pii-redaction' describe('redactPIIFromExecution', () => { beforeEach(() => { @@ -118,3 +123,49 @@ describe('redactPIIFromExecution', () => { expect(mockMaskPIIBatch).not.toHaveBeenCalled() }) }) + +describe('redactObjectStrings', () => { + beforeEach(() => { + vi.clearAllMocks() + mockMaskPIIBatch.mockImplementation(async (texts: string[]) => texts.map((t) => `MASKED(${t})`)) + }) + + it('masks every string leaf and preserves structure', async () => { + const value = { name: 'bob', nested: { email: 'a@b.com' }, list: ['x', 1, true] } + const result = await redactObjectStrings(value, { entityTypes: ['PERSON'] }) + expect(result).toEqual({ + name: 'MASKED(bob)', + nested: { email: 'MASKED(a@b.com)' }, + list: ['MASKED(x)', 1, true], + }) + expect(mockMaskPIIBatch).toHaveBeenCalledTimes(1) + }) + + it('leaves non-string and empty values untouched', async () => { + const value = { count: 5, flag: false, empty: '', nullish: null } + const result = await redactObjectStrings(value, { entityTypes: [] }) + expect(result).toEqual(value) + expect(mockMaskPIIBatch).not.toHaveBeenCalled() + }) + + it('throws PiiRedactionError on masking failure when onFailure is throw', async () => { + mockMaskPIIBatch.mockRejectedValueOnce(new Error('presidio down')) + await expect( + redactObjectStrings({ text: 'a@b.com' }, { entityTypes: [], onFailure: 'throw' }) + ).rejects.toBeInstanceOf(PiiRedactionError) + }) + + it('throws when the payload exceeds the ceiling and onFailure is throw', async () => { + const big = 'x'.repeat(17 * 1024 * 1024) + await expect( + redactObjectStrings({ big }, { entityTypes: [], onFailure: 'throw' }) + ).rejects.toBeInstanceOf(PiiRedactionError) + expect(mockMaskPIIBatch).not.toHaveBeenCalled() + }) + + it('scrubs (does not throw) by default on failure', async () => { + mockMaskPIIBatch.mockRejectedValueOnce(new Error('presidio down')) + const result = await redactObjectStrings({ text: 'a@b.com' }, { entityTypes: [] }) + expect(result).toEqual({ text: REDACTION_FAILED_MARKER }) + }) +}) diff --git a/apps/sim/lib/logs/execution/pii-redaction.ts b/apps/sim/lib/logs/execution/pii-redaction.ts index 5b17694090a..504d4764572 100644 --- a/apps/sim/lib/logs/execution/pii-redaction.ts +++ b/apps/sim/lib/logs/execution/pii-redaction.ts @@ -15,10 +15,30 @@ export const REDACTION_FAILED_MARKER = '[REDACTION_FAILED]' */ const PII_MAX_TOTAL_BYTES = 16 * 1024 * 1024 +/** + * How to handle a masking failure (Presidio error or over-ceiling payload): + * - `'scrub'` (default): replace eligible strings with {@link REDACTION_FAILED_MARKER}. + * Safe for the log stage — execution already succeeded. + * - `'throw'`: throw {@link PiiRedactionError}. Used for the execution-altering + * stages (input/block outputs) where a marker would corrupt computed data and + * fail-open would leak — so the run aborts instead. + */ +export type PiiRedactionFailureMode = 'scrub' | 'throw' + export interface PiiRedactionOptions { /** Presidio entity types to mask. Empty = redact all detected PII. */ entityTypes: string[] language?: string + /** Failure handling. Defaults to `'scrub'`. */ + onFailure?: PiiRedactionFailureMode +} + +/** Thrown when in-flight redaction (`onFailure: 'throw'`) cannot mask safely. */ +export class PiiRedactionError extends Error { + constructor(message: string) { + super(message) + this.name = 'PiiRedactionError' + } } export interface RedactablePayload { @@ -115,6 +135,74 @@ function transformUnit( return transformStrings(value, handle) } +/** + * Mask a batch of collected strings via Presidio. On a hard failure or when the + * batch exceeds the ceiling, either scrub to {@link REDACTION_FAILED_MARKER} or + * throw {@link PiiRedactionError}, per `options.onFailure`. Returns masked values + * aligned 1:1 with `collected`. + */ +async function maskCollected( + collected: string[], + totalBytes: number, + options: PiiRedactionOptions +): Promise<{ masked: string[]; scrubbed: boolean }> { + const onFailure = options.onFailure ?? 'scrub' + const language = options.language ?? 'en' + + const fail = (reason: string): { masked: string[]; scrubbed: boolean } => { + if (onFailure === 'throw') throw new PiiRedactionError(reason) + return { masked: collected.map(() => REDACTION_FAILED_MARKER), scrubbed: true } + } + + if (totalBytes > PII_MAX_TOTAL_BYTES) { + logger.warn('Payload exceeds PII redaction ceiling', { + totalBytes, + ceiling: PII_MAX_TOTAL_BYTES, + onFailure, + }) + return fail( + `PII redaction skipped: payload ${totalBytes}B exceeds ${PII_MAX_TOTAL_BYTES}B ceiling` + ) + } + + try { + // Presidio runs only in the app container; the persist + execution paths also + // run in the trigger.dev runtime, so masking always goes over HTTP to the app. + const masked = await maskPIIBatchViaHttp(collected, options.entityTypes, language) + return { masked, scrubbed: false } + } catch (error) { + logger.error('PII masking failed', { + error: getErrorMessage(error), + stringCount: collected.length, + onFailure, + }) + return fail(`PII redaction failed: ${getErrorMessage(error)}`) + } +} + +/** + * Mask every eligible string leaf of an arbitrary object in place-preserving + * fashion: collect all leaves in one deterministic pass, mask in a single batched + * Presidio call, then rebuild from the masked slice (the identical traversal + * order makes the two passes line up). Used for the execution-altering input and + * block-output stages, so it defaults callers toward `onFailure: 'throw'`. + */ +export async function redactObjectStrings(value: T, options: PiiRedactionOptions): Promise { + const collected: string[] = [] + let totalBytes = 0 + transformStrings(value, (s) => { + collected.push(s) + totalBytes += Buffer.byteLength(s, 'utf8') + return s + }) + + if (collected.length === 0) return value + + const { masked } = await maskCollected(collected, totalBytes, options) + let index = 0 + return transformStrings(value, () => masked[index++]) as T +} + /** * Mask PII across an execution's `traceSpans` / `finalOutput` / `workflowInput`. * @@ -123,15 +211,14 @@ function transformUnit( * with payload size, not block count. Each unit is then rebuilt independently * from the masked slice, preserving the JSON structure (Presidio never sees the * envelope). On a hard masking failure or when the payload exceeds the ceiling, - * eligible strings are replaced with {@link REDACTION_FAILED_MARKER} rather than - * left unredacted — PII is never persisted on the failure path. + * eligible strings are replaced with {@link REDACTION_FAILED_MARKER} (the default + * `onFailure: 'scrub'`) rather than left unredacted — PII is never persisted on + * the failure path. */ export async function redactPIIFromExecution( payload: RedactablePayload, options: PiiRedactionOptions ): Promise { - const { entityTypes } = options - const language = options.language ?? 'en' const startedAt = performance.now() const units = REDACTABLE_KEYS.filter((key) => payload[key] !== undefined).map((key) => ({ @@ -151,29 +238,7 @@ export async function redactPIIFromExecution( if (collected.length === 0) return payload - let masked: string[] - let scrubbed = false - if (totalBytes > PII_MAX_TOTAL_BYTES) { - logger.warn('Execution exceeds PII redaction ceiling; scrubbing text', { - totalBytes, - ceiling: PII_MAX_TOTAL_BYTES, - }) - masked = collected.map(() => REDACTION_FAILED_MARKER) - scrubbed = true - } else { - try { - // Presidio runs only in the app container; the persist path also runs in - // the trigger.dev runtime, so masking always goes over HTTP to the app. - masked = await maskPIIBatchViaHttp(collected, entityTypes, language) - } catch (error) { - logger.error('PII masking failed; scrubbing text to avoid leaking PII', { - error: getErrorMessage(error), - stringCount: collected.length, - }) - masked = collected.map(() => REDACTION_FAILED_MARKER) - scrubbed = true - } - } + const { masked, scrubbed } = await maskCollected(collected, totalBytes, options) let index = 0 const result: RedactablePayload = { ...payload } diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 068e4bf9e31..a8aaeb1453f 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -3,17 +3,27 @@ * This is the SINGLE source of truth for workflow execution */ +import { db } from '@sim/db' +import { organization, workspace } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' import { filterUndefined, isPlainRecord, isRecordLike } from '@sim/utils/object' import { mergeSubblockStateWithValues } from '@sim/workflow-persistence/subblocks' +import { eq } from 'drizzle-orm' import type { Edge } from 'reactflow' import { z } from 'zod' +import { + DEFAULT_PII_REDACTION, + type EffectivePiiRedaction, + resolveEffectivePiiRedaction, +} from '@/lib/billing/retention' +import { isFeatureEnabled } from '@/lib/core/config/feature-flags' import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' import { clearExecutionCancellation } from '@/lib/execution/cancellation' import { warmLargeValueRefs } from '@/lib/execution/payloads/hydration' import { parseLargeExecutionValue } from '@/lib/execution/payloads/large-execution-value' import type { LoggingSession } from '@/lib/logs/execution/logging-session' +import { redactObjectStrings } from '@/lib/logs/execution/pii-redaction' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { loadDeployedWorkflowState, @@ -635,6 +645,36 @@ export async function executeWorkflowCore( metadata.resumeFromSnapshot === true || Boolean(runFromBlock?.sourceSnapshot && !runFromBlock.sourceExecutionId) + // Resolve the org/workspace PII redaction policy once; serves both the input + // stage (below) and the block-outputs stage (threaded into the executor). + // Same fail-safe stance as the logs stage: presence of a rule implies + // entitlement, so we never re-check the plan (which would fail open on a + // transient lookup error and leak PII). + let piiRedaction: EffectivePiiRedaction = DEFAULT_PII_REDACTION + if (await isFeatureEnabled('pii-redaction')) { + const [row] = await db + .select({ orgSettings: organization.dataRetentionSettings }) + .from(workspace) + .leftJoin(organization, eq(organization.id, workspace.organizationId)) + .where(eq(workspace.id, providedWorkspaceId)) + .limit(1) + piiRedaction = resolveEffectivePiiRedaction({ + orgSettings: row?.orgSettings, + workspaceId: providedWorkspaceId, + }) + } + + if (piiRedaction.input.enabled) { + // Redact the input before the workflow sees it. `onFailure: 'throw'` aborts + // the run (handled by the surrounding catch) rather than feeding a scrub + // marker into execution or leaking unredacted input. + processedInput = await redactObjectStrings(processedInput, { + entityTypes: piiRedaction.input.entityTypes, + language: piiRedaction.input.language, + onFailure: 'throw', + }) + } + const contextExtensions: ContextExtensions = { stream: !!onStream, selectedOutputs, @@ -647,6 +687,7 @@ export async function executeWorkflowCore( userId, isDeployedContext: !metadata.isClientSession, enforceCredentialAccess: metadata.enforceCredentialAccess ?? false, + piiBlockOutputRedaction: piiRedaction.blockOutputs, onBlockStart: wrappedOnBlockStart, onBlockComplete: wrappedOnBlockComplete, onStream, diff --git a/packages/db/schema.ts b/packages/db/schema.ts index 383c1a3120a..876a60d3e05 100644 --- a/packages/db/schema.ts +++ b/packages/db/schema.ts @@ -1076,21 +1076,40 @@ export const chat = pgTable( } ) +/** Per-stage PII redaction policy stored on a {@link PiiRedactionRule}. */ +export interface PiiStagePolicy { + enabled: boolean + /** Presidio entity types to mask. Empty (or disabled) = redact nothing. */ + entityTypes: string[] + /** Language whose Presidio recognizers apply (e.g. 'en', 'es'); defaults to English. */ + language?: string +} + /** * A single PII redaction rule. Lives in the org-level * {@link DataRetentionSettings.piiRedaction} rules list. Each rule targets one * scope — all workspaces (`workspaceId: null`) or a single workspace — and * `workspaceId` is unique across rules. Resolution is most-specific-wins: a * workspace's own rule overrides the all-workspaces rule (never unioned). + * + * New rules carry per-stage {@link stages} (input / blockOutputs / logs); legacy + * rows carry only the flat `entityTypes`/`language`, resolved as a logs-only + * rule. At least one of the two is present. */ export interface PiiRedactionRule { id: string name?: string - /** Presidio entity types to mask. Empty = redact nothing for this scope. */ - entityTypes: string[] /** `null` = all workspaces; otherwise the single targeted workspace. */ workspaceId: string | null - /** Language whose Presidio recognizers apply (e.g. 'en', 'es'); defaults to English. */ + /** Per-stage policy (input redaction, block-output redaction, log redaction). */ + stages?: { + input: PiiStagePolicy + blockOutputs: PiiStagePolicy + logs: PiiStagePolicy + } + /** Legacy flat policy (pre-stages). Presidio entity types masked at log persist. */ + entityTypes?: string[] + /** Legacy flat language (pre-stages). */ language?: string } From 36f2a3d6dd441179b02a61a2b9a56b26da7b5b8b Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 29 Jun 2026 17:37:48 -0700 Subject: [PATCH 02/15] fix(data-retention): propagate block-output redaction into child workflows --- apps/sim/executor/handlers/workflow/workflow-handler.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/sim/executor/handlers/workflow/workflow-handler.ts b/apps/sim/executor/handlers/workflow/workflow-handler.ts index a96d2a611aa..2a8d3d73c31 100644 --- a/apps/sim/executor/handlers/workflow/workflow-handler.ts +++ b/apps/sim/executor/handlers/workflow/workflow-handler.ts @@ -179,6 +179,9 @@ export class WorkflowBlockHandler implements BlockHandler { userId: ctx.userId, executionId: ctx.executionId, abortSignal: ctx.abortSignal, + // Propagate in-flight block-output redaction into child workflows so + // nested blocks mask outputs too (recurses: each child forwards it). + piiBlockOutputRedaction: ctx.piiBlockOutputRedaction, callChain: childCallChain, ...(shouldPropagateCallbacks && { onBlockStart: ctx.onBlockStart, From bb3a84b7965f424c71328c6c1de55ba0adfc4138 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 29 Jun 2026 17:45:38 -0700 Subject: [PATCH 03/15] fix(data-retention): close block-output redaction gaps on streaming + resume --- apps/sim/executor/execution/block-executor.ts | 6 ++++- .../lib/workflows/executor/execution-core.ts | 24 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 0b54d8d76d3..91856cfd744 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -186,7 +186,11 @@ export class BlockExecutor { if (isStreamingExecution) { const streamingExec = output as StreamingExecution - if (ctx.onStream) { + // Streaming forwards raw chunks to the client before output redaction can + // run, which would leak PII. When block-output redaction is enabled we + // buffer instead of streaming — the masked final output still reaches the + // client through the block-complete callback below. + if (ctx.onStream && !ctx.piiBlockOutputRedaction?.enabled) { await this.handleStreamingExecution( ctx, node, diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index a8aaeb1453f..b85c75ce111 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -675,6 +675,30 @@ export async function executeWorkflowCore( }) } + if (piiRedaction.blockOutputs.enabled) { + // Resume / run-from-block restore prior block outputs into state. If those + // predate the blockOutputs stage being enabled, re-mask them so downstream + // blocks can't read unredacted PII from restored snapshot state. Masking is + // idempotent, so outputs already masked in the original run are unaffected. + const blockOutputOpts = { + entityTypes: piiRedaction.blockOutputs.entityTypes, + language: piiRedaction.blockOutputs.language, + onFailure: 'throw' as const, + } + if (snapshot.state?.blockStates) { + snapshot.state.blockStates = await redactObjectStrings( + snapshot.state.blockStates, + blockOutputOpts + ) + } + if (runFromBlock?.sourceSnapshot?.blockStates) { + runFromBlock.sourceSnapshot.blockStates = await redactObjectStrings( + runFromBlock.sourceSnapshot.blockStates, + blockOutputOpts + ) + } + } + const contextExtensions: ContextExtensions = { stream: !!onStream, selectedOutputs, From 0b81fed264212695c11823bb5a3a768f47e9093e Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 29 Jun 2026 17:56:18 -0700 Subject: [PATCH 04/15] fix(data-retention): drain+mask streamed output, resolve PII policy unconditionally (no fail-open) --- apps/sim/executor/execution/block-executor.ts | 119 ++++++++++++------ apps/sim/lib/logs/execution/logger.ts | 14 +-- .../lib/workflows/executor/execution-core.ts | 39 +++--- 3 files changed, 101 insertions(+), 71 deletions(-) diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 91856cfd744..d8c4821df20 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -186,18 +186,20 @@ export class BlockExecutor { if (isStreamingExecution) { const streamingExec = output as StreamingExecution - // Streaming forwards raw chunks to the client before output redaction can - // run, which would leak PII. When block-output redaction is enabled we - // buffer instead of streaming — the masked final output still reaches the - // client through the block-complete callback below. - if (ctx.onStream && !ctx.piiBlockOutputRedaction?.enabled) { + // The stream must still be drained to populate `execution.output`, but + // forwarding raw chunks to the client (or persisting them to memory) + // before redaction would leak PII. When block-output redaction is on we + // drain in buffer-only mode (no `onStream`, content masked before it's + // stored); the masked final output reaches the client via block-complete. + if (ctx.onStream) { await this.handleStreamingExecution( ctx, node, block, streamingExec, resolvedInputs, - normalizeStringArray(ctx.selectedOutputs) + normalizeStringArray(ctx.selectedOutputs), + !ctx.piiBlockOutputRedaction?.enabled ) } @@ -734,7 +736,8 @@ export class BlockExecutor { block: SerializedBlock, streamingExec: StreamingExecution, resolvedInputs: Record, - selectedOutputs: string[] + selectedOutputs: string[], + forwardToClient = true ): Promise { const blockId = node.id @@ -749,50 +752,73 @@ export class BlockExecutor { let drainError: unknown let sourceFullyDrained = false - const clientSource = new ReadableStream({ - async pull(controller) { + if (forwardToClient) { + const clientSource = new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await sourceReader.read() + if (done) { + const tail = decoder.decode() + if (tail) accumulated.push(tail) + sourceFullyDrained = true + controller.close() + return + } + accumulated.push(decoder.decode(value, { stream: true })) + controller.enqueue(value) + } catch (error) { + drainError = error + controller.error(error) + } + }, + async cancel(reason) { + try { + await sourceReader.cancel(reason) + } catch {} + }, + }) + + const processedClientStream = streamingResponseFormatProcessor.processStream( + clientSource, + blockId, + selectedOutputs, + responseFormat + ) + + try { + await ctx.onStream?.({ + stream: processedClientStream, + execution: streamingExec.execution, + }) + } catch (error) { + this.execLogger.error('Error in onStream callback', { blockId, error }) + await processedClientStream.cancel().catch(() => {}) + } finally { try { + sourceReader.releaseLock() + } catch {} + } + } else { + // Buffer-only drain: consume the source so `execution.output` is complete, + // but never forward raw chunks to the client (block-output redaction is on). + try { + while (true) { const { done, value } = await sourceReader.read() if (done) { const tail = decoder.decode() if (tail) accumulated.push(tail) sourceFullyDrained = true - controller.close() - return + break } accumulated.push(decoder.decode(value, { stream: true })) - controller.enqueue(value) - } catch (error) { - drainError = error - controller.error(error) } - }, - async cancel(reason) { + } catch (error) { + drainError = error + } finally { try { - await sourceReader.cancel(reason) + sourceReader.releaseLock() } catch {} - }, - }) - - const processedClientStream = streamingResponseFormatProcessor.processStream( - clientSource, - blockId, - selectedOutputs, - responseFormat - ) - - try { - await ctx.onStream?.({ - stream: processedClientStream, - execution: streamingExec.execution, - }) - } catch (error) { - this.execLogger.error('Error in onStream callback', { blockId, error }) - await processedClientStream.cancel().catch(() => {}) - } finally { - try { - sourceReader.releaseLock() - } catch {} + } } if (drainError) { @@ -814,11 +840,22 @@ export class BlockExecutor { return } - const fullContent = accumulated.join('') + let fullContent = accumulated.join('') if (!fullContent) { return } + if (!forwardToClient && ctx.piiBlockOutputRedaction?.enabled) { + // Mask before the content is written to `execution.output` or persisted to + // memory via `onFullContent`, so the streamed agent response can't leak PII + // through either path. The block-output redaction below is then idempotent. + fullContent = await redactObjectStrings(fullContent, { + entityTypes: ctx.piiBlockOutputRedaction.entityTypes, + language: ctx.piiBlockOutputRedaction.language, + onFailure: 'throw', + }) + } + const executionOutput = streamingExec.execution?.output if (executionOutput && typeof executionOutput === 'object') { let parsedForFormat = false diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 435d0ce70d8..cea06aa53cc 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -29,7 +29,6 @@ import { import { resolveEffectivePiiRedaction } from '@/lib/billing/retention' import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing' import { isBillingEnabled } from '@/lib/core/config/env-flags' -import { isFeatureEnabled } from '@/lib/core/config/feature-flags' import { redactApiKeys } from '@/lib/core/security/redaction' import { filterForDisplay } from '@/lib/core/utils/display-filters' import { @@ -622,8 +621,6 @@ export class ExecutionLogger implements IExecutionLoggerService { ): Promise { if (!workspaceId) return payload - if (!(await isFeatureEnabled('pii-redaction'))) return payload - const [row] = await db .select({ orgSettings: organization.dataRetentionSettings }) .from(workspace) @@ -632,11 +629,12 @@ export class ExecutionLogger implements IExecutionLoggerService { .limit(1) if (!row) return payload - // Rules are only writable by enterprise orgs (route-gated), so an enabled - // rule already implies entitlement. We deliberately do NOT re-check - // `isWorkspaceOnEnterprisePlan` here: it returns false on transient lookup - // errors, which would silently skip masking and leak PII (fail-open). When - // rules are present we always redact (fail-safe; over-redaction at worst). + // Resolve from stored rules UNCONDITIONALLY — deliberately NOT gated on the + // `pii-redaction` feature flag or the enterprise-plan check. Rules are only + // writable by entitled orgs (route-gated), so their presence is the source of + // truth; re-checking the flag/plan here returns false on a transient read and + // would silently skip masking, leaking PII (fail-open). Absence of rules + // yields the disabled default, so non-PII orgs incur only the lookup. const config = resolveEffectivePiiRedaction({ orgSettings: row.orgSettings, workspaceId }).logs if (!config.enabled) return payload diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index b85c75ce111..a09b2914a00 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -12,12 +12,7 @@ import { mergeSubblockStateWithValues } from '@sim/workflow-persistence/subblock import { eq } from 'drizzle-orm' import type { Edge } from 'reactflow' import { z } from 'zod' -import { - DEFAULT_PII_REDACTION, - type EffectivePiiRedaction, - resolveEffectivePiiRedaction, -} from '@/lib/billing/retention' -import { isFeatureEnabled } from '@/lib/core/config/feature-flags' +import { type EffectivePiiRedaction, resolveEffectivePiiRedaction } from '@/lib/billing/retention' import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' import { clearExecutionCancellation } from '@/lib/execution/cancellation' import { warmLargeValueRefs } from '@/lib/execution/payloads/hydration' @@ -647,22 +642,22 @@ export async function executeWorkflowCore( // Resolve the org/workspace PII redaction policy once; serves both the input // stage (below) and the block-outputs stage (threaded into the executor). - // Same fail-safe stance as the logs stage: presence of a rule implies - // entitlement, so we never re-check the plan (which would fail open on a - // transient lookup error and leak PII). - let piiRedaction: EffectivePiiRedaction = DEFAULT_PII_REDACTION - if (await isFeatureEnabled('pii-redaction')) { - const [row] = await db - .select({ orgSettings: organization.dataRetentionSettings }) - .from(workspace) - .leftJoin(organization, eq(organization.id, workspace.organizationId)) - .where(eq(workspace.id, providedWorkspaceId)) - .limit(1) - piiRedaction = resolveEffectivePiiRedaction({ - orgSettings: row?.orgSettings, - workspaceId: providedWorkspaceId, - }) - } + // Resolved from stored rules UNCONDITIONALLY — deliberately NOT gated on the + // `pii-redaction` feature flag. The flag gates configuration (the settings + // route); a transient/false flag read at execution time would skip masking + // and leak PII (fail-open). Stored rules are only writable by entitled orgs, + // so their presence is the source of truth; absence yields the disabled + // default (one indexed lookup, no masking cost for non-PII orgs). + const [row] = await db + .select({ orgSettings: organization.dataRetentionSettings }) + .from(workspace) + .leftJoin(organization, eq(organization.id, workspace.organizationId)) + .where(eq(workspace.id, providedWorkspaceId)) + .limit(1) + const piiRedaction: EffectivePiiRedaction = resolveEffectivePiiRedaction({ + orgSettings: row?.orgSettings, + workspaceId: providedWorkspaceId, + }) if (piiRedaction.input.enabled) { // Redact the input before the workflow sees it. `onFailure: 'throw'` aborts From 2d7598794672cf41525a5db702cf2a0cee7f928d Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 29 Jun 2026 18:05:39 -0700 Subject: [PATCH 05/15] test(testing): support leftJoin().where().limit() in shared db mock --- packages/testing/src/mocks/database.mock.ts | 22 +++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/packages/testing/src/mocks/database.mock.ts b/packages/testing/src/mocks/database.mock.ts index bb34193d505..17a039b5d63 100644 --- a/packages/testing/src/mocks/database.mock.ts +++ b/packages/testing/src/mocks/database.mock.ts @@ -258,17 +258,19 @@ export const dbChainMock = { * Creates a mock database connection. */ export function createMockDb() { + // A `where(...)` result that is both awaitable (resolves to `[]`) and exposes + // `.limit`/`.orderBy`, so `select().from()[.leftJoin()].where()[.limit()]` + // works whether or not a terminal is chained. + const whereResult = () => { + const thenable: any = Promise.resolve([]) + thenable.limit = vi.fn(() => Promise.resolve([])) + thenable.orderBy = vi.fn(() => Promise.resolve([])) + return thenable + } const fromBuilder = () => ({ - where: vi.fn(() => ({ - limit: vi.fn(() => Promise.resolve([])), - orderBy: vi.fn(() => Promise.resolve([])), - })), - leftJoin: vi.fn(() => ({ - where: vi.fn(() => Promise.resolve([])), - })), - innerJoin: vi.fn(() => ({ - where: vi.fn(() => Promise.resolve([])), - })), + where: vi.fn(whereResult), + leftJoin: vi.fn(() => ({ where: vi.fn(whereResult) })), + innerJoin: vi.fn(() => ({ where: vi.fn(whereResult) })), }) return { From eb6b25af8d5b89192d2f9f73432b5d6271c19f8c Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 29 Jun 2026 18:08:48 -0700 Subject: [PATCH 06/15] fix(data-retention): mask agent/Pi memory writes under block-output redaction --- apps/sim/executor/handlers/agent/memory.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/apps/sim/executor/handlers/agent/memory.ts b/apps/sim/executor/handlers/agent/memory.ts index a4de617b0ab..3405a91a196 100644 --- a/apps/sim/executor/handlers/agent/memory.ts +++ b/apps/sim/executor/handlers/agent/memory.ts @@ -3,6 +3,7 @@ import { memory } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { generateId } from '@sim/utils/id' import { and, eq, sql } from 'drizzle-orm' +import { redactObjectStrings } from '@/lib/logs/execution/pii-redaction' import { getAccurateTokenCount } from '@/lib/tokenization/estimators' import { MEMORY } from '@/executor/constants' import type { AgentInputs, Message } from '@/executor/handlers/agent/types' @@ -58,6 +59,21 @@ export class Memory { const workspaceId = this.requireWorkspaceId(ctx) this.validateConversationId(inputs.conversationId) + + // Handlers persist their response to memory before the executor redacts the + // block output, so mask here too when the block-output stage is enabled — + // otherwise raw PII would be stored in memory and read back on later runs. + if (ctx.piiBlockOutputRedaction?.enabled && message.content) { + message = { + ...message, + content: await redactObjectStrings(message.content, { + entityTypes: ctx.piiBlockOutputRedaction.entityTypes, + language: ctx.piiBlockOutputRedaction.language, + onFailure: 'throw', + }), + } + } + this.validateContent(message.content) const key = inputs.conversationId! From d55b557031fb873fcf2f4682aad74a8362685558 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 30 Jun 2026 10:05:56 -0700 Subject: [PATCH 07/15] fix(data-retention): guard partial PII stages in GET normalize --- apps/sim/app/api/organizations/[id]/data-retention/route.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/sim/app/api/organizations/[id]/data-retention/route.ts b/apps/sim/app/api/organizations/[id]/data-retention/route.ts index 830939eda73..2cc93065650 100644 --- a/apps/sim/app/api/organizations/[id]/data-retention/route.ts +++ b/apps/sim/app/api/organizations/[id]/data-retention/route.ts @@ -46,15 +46,15 @@ function normalizeConfigured( ? { input: { ...rule.stages.input, - language: coercePiiLanguage(rule.stages.input.language), + language: coercePiiLanguage(rule.stages.input?.language), }, blockOutputs: { ...rule.stages.blockOutputs, - language: coercePiiLanguage(rule.stages.blockOutputs.language), + language: coercePiiLanguage(rule.stages.blockOutputs?.language), }, logs: { ...rule.stages.logs, - language: coercePiiLanguage(rule.stages.logs.language), + language: coercePiiLanguage(rule.stages.logs?.language), }, } : undefined, From 83ffe4d0850487649bd0461d34cd2056833209e7 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 30 Jun 2026 10:13:50 -0700 Subject: [PATCH 08/15] fix(data-retention): mask seeded memory messages under block-output redaction --- apps/sim/executor/handlers/agent/memory.ts | 38 ++++++++++++++-------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/apps/sim/executor/handlers/agent/memory.ts b/apps/sim/executor/handlers/agent/memory.ts index 3405a91a196..9dfa2908bf6 100644 --- a/apps/sim/executor/handlers/agent/memory.ts +++ b/apps/sim/executor/handlers/agent/memory.ts @@ -60,19 +60,7 @@ export class Memory { const workspaceId = this.requireWorkspaceId(ctx) this.validateConversationId(inputs.conversationId) - // Handlers persist their response to memory before the executor redacts the - // block output, so mask here too when the block-output stage is enabled — - // otherwise raw PII would be stored in memory and read back on later runs. - if (ctx.piiBlockOutputRedaction?.enabled && message.content) { - message = { - ...message, - content: await redactObjectStrings(message.content, { - entityTypes: ctx.piiBlockOutputRedaction.entityTypes, - language: ctx.piiBlockOutputRedaction.language, - onFailure: 'throw', - }), - } - } + message = await this.maskContentForStorage(ctx, message) this.validateContent(message.content) @@ -118,6 +106,10 @@ export class Memory { messagesToStore = this.applyTokenWindow(conversationMessages, maxTokens, inputs.model) } + messagesToStore = await Promise.all( + messagesToStore.map((message) => this.maskContentForStorage(ctx, message)) + ) + await this.seedMemoryRecord(workspaceId, key, messagesToStore) logger.debug('Seeded memory', { @@ -127,6 +119,26 @@ export class Memory { }) } + /** + * Handlers persist messages to memory before the executor redacts block + * output, so mask content here too when the block-output stage is enabled — + * otherwise raw PII is stored in the memory table and read back on later runs. + * `onFailure: 'throw'` aborts rather than persisting unredacted content. + */ + private async maskContentForStorage(ctx: ExecutionContext, message: Message): Promise { + if (!ctx.piiBlockOutputRedaction?.enabled || !message.content) { + return message + } + return { + ...message, + content: await redactObjectStrings(message.content, { + entityTypes: ctx.piiBlockOutputRedaction.entityTypes, + language: ctx.piiBlockOutputRedaction.language, + onFailure: 'throw', + }), + } + } + private requireWorkspaceId(ctx: ExecutionContext): string { if (!ctx.workspaceId) { throw new Error('workspaceId is required for memory operations') From a911af81d296392121224a75c2b7c3b2664a375f Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 30 Jun 2026 10:23:19 -0700 Subject: [PATCH 09/15] fix(guardrails): fail closed on misaligned Presidio batch responses --- apps/sim/lib/guardrails/validate_pii.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/apps/sim/lib/guardrails/validate_pii.ts b/apps/sim/lib/guardrails/validate_pii.ts index 2ed9ac9a50e..72670f2d169 100644 --- a/apps/sim/lib/guardrails/validate_pii.ts +++ b/apps/sim/lib/guardrails/validate_pii.ts @@ -239,6 +239,15 @@ export async function maskPIIBatch( const chunkTexts = indices.map((i) => texts[i]) const spansPerText = await analyzeBatch(chunkTexts, entityTypes, language) + // A short/misaligned batch response would silently leave the unmatched + // strings unmasked (fail-open). Throw so the caller applies its fail-safe + // (scrub for logs, abort for in-flight stages) instead of leaking PII. + if (spansPerText.length !== chunkTexts.length) { + throw new Error( + `Presidio analyze_batch returned ${spansPerText.length} result(s) for ${chunkTexts.length} input(s)` + ) + } + const toAnonymize: AnonymizeBatchItem[] = [] const anonymizePositions: number[] = [] indices.forEach((originalIndex, pos) => { @@ -252,6 +261,11 @@ export async function maskPIIBatch( }) const masked = await anonymizeBatch(toAnonymize) + if (masked.length !== toAnonymize.length) { + throw new Error( + `Presidio anonymize_batch returned ${masked.length} result(s) for ${toAnonymize.length} input(s)` + ) + } anonymizePositions.forEach((pos, k) => { result[indices[pos]] = masked[k] }) From 31f2e3fdc53eb3758fb06c9f2a4b92e2fe1a6716 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 30 Jun 2026 10:34:40 -0700 Subject: [PATCH 10/15] fix(data-retention): enabled stage with no entity types redacts all (no fail-open) --- apps/sim/lib/billing/retention.test.ts | 6 ++++-- apps/sim/lib/billing/retention.ts | 20 +++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/apps/sim/lib/billing/retention.test.ts b/apps/sim/lib/billing/retention.test.ts index df7a056e691..361b1245701 100644 --- a/apps/sim/lib/billing/retention.test.ts +++ b/apps/sim/lib/billing/retention.test.ts @@ -108,7 +108,7 @@ describe('resolveEffectivePiiRedaction', () => { }) }) - it('disables a stage that is enabled but has no entity types', () => { + it('keeps an enabled stage active (redact all) when it has no entity types, and disables a disabled stage', () => { const result = resolveEffectivePiiRedaction({ orgSettings: settings([ { @@ -123,7 +123,9 @@ describe('resolveEffectivePiiRedaction', () => { ]), workspaceId: 'ws-1', }) - expect(result.input).toEqual(DISABLED) + // enabled + empty entityTypes = redact ALL detected PII (not disabled). + expect(result.input).toEqual({ enabled: true, entityTypes: [], language: 'en' }) + // disabled stage stays off regardless of its entity types. expect(result.blockOutputs).toEqual(DISABLED) expect(result.logs).toEqual({ enabled: true, entityTypes: ['PERSON'], language: 'en' }) }) diff --git a/apps/sim/lib/billing/retention.ts b/apps/sim/lib/billing/retention.ts index 994088461d5..66916b2edc1 100644 --- a/apps/sim/lib/billing/retention.ts +++ b/apps/sim/lib/billing/retention.ts @@ -37,13 +37,18 @@ function sanitizeEntityTypes(value: unknown): string[] { return Array.isArray(value) ? value.filter((t): t is string => typeof t === 'string') : [] } -/** A stage redacts nothing unless it is enabled AND has at least one entity type. */ +/** + * Expand a stored stage policy into its effective form. A disabled stage redacts + * nothing. An ENABLED stage with no entity types redacts ALL detected PII — the + * masking layer omits `entities` from the Presidio request — so it stays active; + * treating enabled-but-empty as disabled would let an explicit "redact all" save + * silently skip masking (fail-open). + */ function toEffectiveStage(policy: PiiStagePolicy | undefined): EffectivePiiStage { - const types = sanitizeEntityTypes(policy?.entityTypes) - if (!policy?.enabled || types.length === 0) return DISABLED_STAGE + if (!policy?.enabled) return DISABLED_STAGE return { enabled: true, - entityTypes: types, + entityTypes: sanitizeEntityTypes(policy.entityTypes), language: coercePiiLanguage(policy.language) ?? DEFAULT_PII_LANGUAGE, } } @@ -55,9 +60,10 @@ function toEffectiveStage(policy: PiiStagePolicy | undefined): EffectivePiiStage * selection is whole-rule; the selected rule is then expanded into three stages. * * Back-compat: a legacy rule with no `stages` is treated exactly as it was before - * — logs-only, masking its flat `entityTypes` (input/blockOutputs disabled). A - * resolved stage with no entity types redacts nothing. Defensive about the - * loosely-typed JSON column. + * — logs-only, masking its flat `entityTypes` (input/blockOutputs disabled); an + * empty flat `entityTypes` redacts nothing (the workspace-exemption shape). For + * per-stage rules an enabled stage with no entity types redacts ALL detected PII + * (see {@link toEffectiveStage}). Defensive about the loosely-typed JSON column. */ export function resolveEffectivePiiRedaction(params: { orgSettings: DataRetentionSettings | null | undefined From 437d2bb8e26a1a7529aca6dabcbd31e81f8dd384 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 30 Jun 2026 10:47:25 -0700 Subject: [PATCH 11/15] fix(data-retention): reject enabled stage with no entity types; empty = off everywhere --- .../lib/api/contracts/data-retention.test.ts | 26 +++++++++++++++++++ apps/sim/lib/api/contracts/primitives.ts | 26 +++++++++++++------ apps/sim/lib/billing/retention.test.ts | 6 ++--- apps/sim/lib/billing/retention.ts | 22 ++++++++-------- 4 files changed, 57 insertions(+), 23 deletions(-) diff --git a/apps/sim/lib/api/contracts/data-retention.test.ts b/apps/sim/lib/api/contracts/data-retention.test.ts index 3efd542be49..10fdcb9e720 100644 --- a/apps/sim/lib/api/contracts/data-retention.test.ts +++ b/apps/sim/lib/api/contracts/data-retention.test.ts @@ -106,6 +106,32 @@ describe('piiRedactionRuleSchema', () => { expect(result.success).toBe(false) }) + it('rejects an enabled stage with no entity types (redact-all is not expressible)', () => { + const result = piiRedactionRuleSchema.safeParse({ + id: 'r-1', + workspaceId: null, + stages: { + input: stage(true, []), + blockOutputs: stage(false, []), + logs: stage(false, []), + }, + }) + expect(result.success).toBe(false) + }) + + it('accepts a disabled stage with no entity types (off)', () => { + const result = piiRedactionRuleSchema.safeParse({ + id: 'r-1', + workspaceId: null, + stages: { + input: stage(false, []), + blockOutputs: stage(false, []), + logs: stage(true, ['PERSON']), + }, + }) + expect(result.success).toBe(true) + }) + it('rejects an unsupported stage language', () => { const result = piiRedactionRuleSchema.safeParse({ id: 'r-1', diff --git a/apps/sim/lib/api/contracts/primitives.ts b/apps/sim/lib/api/contracts/primitives.ts index f45571518a6..6acf45a4238 100644 --- a/apps/sim/lib/api/contracts/primitives.ts +++ b/apps/sim/lib/api/contracts/primitives.ts @@ -113,14 +113,24 @@ export const userFileSchema = z }) .passthrough() -/** Per-stage redaction policy: which entity types to mask, in which language. */ -export const piiStagePolicySchema = z.object({ - enabled: z.boolean(), - /** Presidio entity types to mask. Empty (or disabled) = redact nothing. */ - entityTypes: z.array(z.string().min(1, 'Entity type cannot be empty')).max(100), - /** Language whose Presidio recognizers apply; defaults to English. */ - language: z.enum(PII_LANGUAGE_CODES).optional(), -}) +/** + * Per-stage redaction policy: which entity types to mask, in which language. An + * enabled stage must name at least one entity type — "redact all" is not an + * expressible policy, so `enabled: true` with an empty list (which would resolve + * to off and silently skip masking) is rejected at the boundary. + */ +export const piiStagePolicySchema = z + .object({ + enabled: z.boolean(), + /** Presidio entity types to mask. Disabled stages may be empty. */ + entityTypes: z.array(z.string().min(1, 'Entity type cannot be empty')).max(100), + /** Language whose Presidio recognizers apply; defaults to English. */ + language: z.enum(PII_LANGUAGE_CODES).optional(), + }) + .refine((stage) => !stage.enabled || stage.entityTypes.length > 0, { + message: 'An enabled redaction stage must select at least one entity type.', + path: ['entityTypes'], + }) export type PiiStagePolicy = z.output diff --git a/apps/sim/lib/billing/retention.test.ts b/apps/sim/lib/billing/retention.test.ts index 361b1245701..f14e343fdfc 100644 --- a/apps/sim/lib/billing/retention.test.ts +++ b/apps/sim/lib/billing/retention.test.ts @@ -108,7 +108,7 @@ describe('resolveEffectivePiiRedaction', () => { }) }) - it('keeps an enabled stage active (redact all) when it has no entity types, and disables a disabled stage', () => { + it('disables a stage that is enabled but has no entity types (empty = off)', () => { const result = resolveEffectivePiiRedaction({ orgSettings: settings([ { @@ -123,9 +123,7 @@ describe('resolveEffectivePiiRedaction', () => { ]), workspaceId: 'ws-1', }) - // enabled + empty entityTypes = redact ALL detected PII (not disabled). - expect(result.input).toEqual({ enabled: true, entityTypes: [], language: 'en' }) - // disabled stage stays off regardless of its entity types. + expect(result.input).toEqual(DISABLED) expect(result.blockOutputs).toEqual(DISABLED) expect(result.logs).toEqual({ enabled: true, entityTypes: ['PERSON'], language: 'en' }) }) diff --git a/apps/sim/lib/billing/retention.ts b/apps/sim/lib/billing/retention.ts index 66916b2edc1..afc8e0c0c76 100644 --- a/apps/sim/lib/billing/retention.ts +++ b/apps/sim/lib/billing/retention.ts @@ -38,17 +38,18 @@ function sanitizeEntityTypes(value: unknown): string[] { } /** - * Expand a stored stage policy into its effective form. A disabled stage redacts - * nothing. An ENABLED stage with no entity types redacts ALL detected PII — the - * masking layer omits `entities` from the Presidio request — so it stays active; - * treating enabled-but-empty as disabled would let an explicit "redact all" save - * silently skip masking (fail-open). + * Expand a stored stage policy into its effective form. A stage redacts nothing + * unless it is enabled AND names at least one entity type. "Redact all" is not an + * expressible policy (the checkbox UI has no such control, and the contract + * rejects enabled-with-no-types), so an empty entity list always means "off" — + * consistent across the UI, the contract, and the masking layer. */ function toEffectiveStage(policy: PiiStagePolicy | undefined): EffectivePiiStage { - if (!policy?.enabled) return DISABLED_STAGE + const types = sanitizeEntityTypes(policy?.entityTypes) + if (!policy?.enabled || types.length === 0) return DISABLED_STAGE return { enabled: true, - entityTypes: sanitizeEntityTypes(policy.entityTypes), + entityTypes: types, language: coercePiiLanguage(policy.language) ?? DEFAULT_PII_LANGUAGE, } } @@ -60,10 +61,9 @@ function toEffectiveStage(policy: PiiStagePolicy | undefined): EffectivePiiStage * selection is whole-rule; the selected rule is then expanded into three stages. * * Back-compat: a legacy rule with no `stages` is treated exactly as it was before - * — logs-only, masking its flat `entityTypes` (input/blockOutputs disabled); an - * empty flat `entityTypes` redacts nothing (the workspace-exemption shape). For - * per-stage rules an enabled stage with no entity types redacts ALL detected PII - * (see {@link toEffectiveStage}). Defensive about the loosely-typed JSON column. + * — logs-only, masking its flat `entityTypes` (input/blockOutputs disabled). A + * resolved stage with no entity types redacts nothing (an empty list is the + * workspace-exemption / off shape). Defensive about the loosely-typed JSON column. */ export function resolveEffectivePiiRedaction(params: { orgSettings: DataRetentionSettings | null | undefined From 78b2c56bb5ed0d758868c8bbe547ced813873720 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 30 Jun 2026 10:49:02 -0700 Subject: [PATCH 12/15] docs(data-retention): note resume remask covers inline values only --- apps/sim/lib/workflows/executor/execution-core.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index a09b2914a00..1f6e4b26989 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -675,6 +675,11 @@ export async function executeWorkflowCore( // predate the blockOutputs stage being enabled, re-mask them so downstream // blocks can't read unredacted PII from restored snapshot state. Masking is // idempotent, so outputs already masked in the original run are unaffected. + // Limitation: this walks inline strings only — values offloaded to + // large-value storage are still refs here and are not re-masked. In the + // normal flow that is safe (a run with the stage on masks before offload); + // the gap is the narrow case of a run that offloaded a large value while + // the stage was OFF and is resumed after the stage is turned ON. const blockOutputOpts = { entityTypes: piiRedaction.blockOutputs.entityTypes, language: piiRedaction.blockOutputs.language, From 8f86d775af32b315ffa11a11192af594c9767b46 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 30 Jun 2026 12:21:28 -0700 Subject: [PATCH 13/15] fix(data-retention): scrub offloaded large-value refs from logs when block-output redaction is off --- apps/sim/lib/logs/execution/logger.ts | 17 ++++++-- .../lib/logs/execution/pii-redaction.test.ts | 28 +++++++++++++ apps/sim/lib/logs/execution/pii-redaction.ts | 40 +++++++++++++++++++ 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 21c5bb7351d..b17dae2406c 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -35,7 +35,11 @@ import { collectLargeValueReferenceKeys, replaceLargeValueReferenceKeysWithClient, } from '@/lib/execution/payloads/large-value-metadata' -import { type RedactablePayload, redactPIIFromExecution } from '@/lib/logs/execution/pii-redaction' +import { + type RedactablePayload, + redactPIIFromExecution, + scrubLargeValueRefs, +} from '@/lib/logs/execution/pii-redaction' import { clearProgressMarkers, type ExecutionProgressMarkers, @@ -635,10 +639,17 @@ export class ExecutionLogger implements IExecutionLoggerService { // truth; re-checking the flag/plan here returns false on a transient read and // would silently skip masking, leaking PII (fail-open). Absence of rules // yields the disabled default, so non-PII orgs incur only the lookup. - const config = resolveEffectivePiiRedaction({ orgSettings: row.orgSettings, workspaceId }).logs + const resolved = resolveEffectivePiiRedaction({ orgSettings: row.orgSettings, workspaceId }) + const config = resolved.logs if (!config.enabled) return payload - return redactPIIFromExecution(payload, { + // The string redactor can't reach values already offloaded to large-value + // storage (>8MB refs). Those are masked before offload only when the + // block-output stage is on; when it's off, scrub the refs so the log never + // references unredacted bytes. + const scrubbed = resolved.blockOutputs.enabled ? payload : scrubLargeValueRefs(payload) + + return redactPIIFromExecution(scrubbed, { entityTypes: config.entityTypes, language: config.language, }) diff --git a/apps/sim/lib/logs/execution/pii-redaction.test.ts b/apps/sim/lib/logs/execution/pii-redaction.test.ts index a6a940b6526..65e163c7564 100644 --- a/apps/sim/lib/logs/execution/pii-redaction.test.ts +++ b/apps/sim/lib/logs/execution/pii-redaction.test.ts @@ -16,6 +16,7 @@ import { REDACTION_FAILED_MARKER, redactObjectStrings, redactPIIFromExecution, + scrubLargeValueRefs, } from '@/lib/logs/execution/pii-redaction' describe('redactPIIFromExecution', () => { @@ -169,3 +170,30 @@ describe('redactObjectStrings', () => { expect(result).toEqual({ text: REDACTION_FAILED_MARKER }) }) }) + +describe('scrubLargeValueRefs', () => { + const ref = { + __simLargeValueRef: true, + version: 1, + id: 'lv_abcdef123456', + kind: 'object', + size: 9_000_000, + } + + it('replaces large-value refs with the marker, preserving surrounding structure', () => { + const result = scrubLargeValueRefs({ + traceSpans: [{ blockId: 'b1', status: 'success', output: { big: ref, small: 'hi' } }], + finalOutput: ref, + }) + const span = (result.traceSpans as any[])[0] + expect(span.blockId).toBe('b1') + expect(span.output.big).toBe(REDACTION_FAILED_MARKER) + expect(span.output.small).toBe('hi') + expect(result.finalOutput).toBe(REDACTION_FAILED_MARKER) + }) + + it('leaves payloads without refs untouched', () => { + const payload = { finalOutput: { answer: 'world', count: 5 } } + expect(scrubLargeValueRefs(payload)).toEqual(payload) + }) +}) diff --git a/apps/sim/lib/logs/execution/pii-redaction.ts b/apps/sim/lib/logs/execution/pii-redaction.ts index 504d4764572..5fbc90a4589 100644 --- a/apps/sim/lib/logs/execution/pii-redaction.ts +++ b/apps/sim/lib/logs/execution/pii-redaction.ts @@ -1,5 +1,7 @@ import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' +import { isLargeArrayManifest } from '@/lib/execution/payloads/large-array-manifest-metadata' +import { isLargeValueRef } from '@/lib/execution/payloads/large-value-ref' import { maskPIIBatchViaHttp } from '@/lib/guardrails/mask-client' const logger = createLogger('PiiRedaction') @@ -135,6 +137,44 @@ function transformUnit( return transformStrings(value, handle) } +/** Replace every offloaded large-value ref / array manifest with `handle()`. */ +function transformRefs(value: unknown, handle: () => unknown): unknown { + if (isLargeValueRef(value) || isLargeArrayManifest(value)) { + return handle() + } + if (Array.isArray(value)) { + return value.map((item) => transformRefs(item, handle)) + } + if (value !== null && typeof value === 'object') { + const out: Record = {} + for (const [key, v] of Object.entries(value)) { + out[key] = transformRefs(v, handle) + } + return out + } + return value +} + +/** + * Replace offloaded large-value references with {@link REDACTION_FAILED_MARKER}. + * + * The string redactor only masks inline content; a value already offloaded to + * large-value storage (>8MB) is an opaque ref it can't reach. Block outputs are + * masked BEFORE offload only when the block-output stage is on — so when that + * stage is off, the refs in a persisted log point to unredacted bytes. Scrubbing + * them keeps raw PII out of the log (the rare huge field loses its content rather + * than leaking; consistent with the over-ceiling scrub behavior). + */ +export function scrubLargeValueRefs(payload: RedactablePayload): RedactablePayload { + const result: RedactablePayload = { ...payload } + for (const key of REDACTABLE_KEYS) { + if (payload[key] !== undefined) { + result[key] = transformRefs(payload[key], () => REDACTION_FAILED_MARKER) + } + } + return result +} + /** * Mask a batch of collected strings via Presidio. On a hard failure or when the * batch exceeds the ceiling, either scrub to {@link REDACTION_FAILED_MARKER} or From 6e9587ad12cae08ac5b3ed309047e01a8d57e004 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 30 Jun 2026 12:34:20 -0700 Subject: [PATCH 14/15] fix(data-retention): hydrate, mask, and re-store large-value refs in logs (preserve redacted content) --- apps/sim/lib/logs/execution/logger.ts | 78 ++++++----- .../logs/execution/pii-large-values.test.ts | 97 ++++++++++++++ .../lib/logs/execution/pii-large-values.ts | 121 ++++++++++++++++++ .../lib/logs/execution/pii-redaction.test.ts | 42 +++--- apps/sim/lib/logs/execution/pii-redaction.ts | 44 +------ 5 files changed, 292 insertions(+), 90 deletions(-) create mode 100644 apps/sim/lib/logs/execution/pii-large-values.test.ts create mode 100644 apps/sim/lib/logs/execution/pii-large-values.ts diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index b17dae2406c..9b9f8dc85a1 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -35,11 +35,8 @@ import { collectLargeValueReferenceKeys, replaceLargeValueReferenceKeysWithClient, } from '@/lib/execution/payloads/large-value-metadata' -import { - type RedactablePayload, - redactPIIFromExecution, - scrubLargeValueRefs, -} from '@/lib/logs/execution/pii-redaction' +import { redactLargeValueRefs } from '@/lib/logs/execution/pii-large-values' +import { type RedactablePayload, redactPIIFromExecution } from '@/lib/logs/execution/pii-redaction' import { clearProgressMarkers, type ExecutionProgressMarkers, @@ -621,7 +618,8 @@ export class ExecutionLogger implements IExecutionLoggerService { */ private async applyPiiRedaction( workspaceId: string | null, - payload: RedactablePayload + payload: RedactablePayload, + storeContext: { workflowId?: string | null; executionId: string; userId?: string | null } ): Promise { if (!workspaceId) return payload @@ -645,11 +643,23 @@ export class ExecutionLogger implements IExecutionLoggerService { // The string redactor can't reach values already offloaded to large-value // storage (>8MB refs). Those are masked before offload only when the - // block-output stage is on; when it's off, scrub the refs so the log never - // references unredacted bytes. - const scrubbed = resolved.blockOutputs.enabled ? payload : scrubLargeValueRefs(payload) + // block-output stage is on; when it's off, hydrate → mask → re-store the refs + // so the log keeps redacted content (falling back to a marker only if a ref + // can't be materialized/re-stored). + const working = resolved.blockOutputs.enabled + ? payload + : await redactLargeValueRefs(payload, { + entityTypes: config.entityTypes, + language: config.language, + store: { + workspaceId, + workflowId: storeContext.workflowId ?? undefined, + executionId: storeContext.executionId, + userId: storeContext.userId ?? undefined, + }, + }) - return redactPIIFromExecution(scrubbed, { + return redactPIIFromExecution(working, { entityTypes: config.entityTypes, language: config.language, }) @@ -793,25 +803,35 @@ export class ExecutionLogger implements IExecutionLoggerService { const redactedWorkflowInput = filteredWorkflowInput !== undefined ? redactApiKeys(filteredWorkflowInput) : undefined - const pii = await this.applyPiiRedaction(existingLog?.workspaceId ?? null, { - traceSpans: redactedTraceSpans, - finalOutput: redactedFinalOutput, - ...(redactedWorkflowInput !== undefined ? { workflowInput: redactedWorkflowInput } : {}), - ...(builtExecutionData.error !== undefined ? { error: builtExecutionData.error } : {}), - ...(builtExecutionData.completionFailure !== undefined - ? { completionFailure: builtExecutionData.completionFailure } - : {}), - ...(builtExecutionData.trigger !== undefined ? { trigger: builtExecutionData.trigger } : {}), - ...(builtExecutionData.executionState !== undefined - ? { executionState: builtExecutionData.executionState } - : {}), - ...(builtExecutionData.environment !== undefined - ? { environment: builtExecutionData.environment } - : {}), - ...(builtExecutionData.correlation !== undefined - ? { correlation: builtExecutionData.correlation } - : {}), - }) + const pii = await this.applyPiiRedaction( + existingLog?.workspaceId ?? null, + { + traceSpans: redactedTraceSpans, + finalOutput: redactedFinalOutput, + ...(redactedWorkflowInput !== undefined ? { workflowInput: redactedWorkflowInput } : {}), + ...(builtExecutionData.error !== undefined ? { error: builtExecutionData.error } : {}), + ...(builtExecutionData.completionFailure !== undefined + ? { completionFailure: builtExecutionData.completionFailure } + : {}), + ...(builtExecutionData.trigger !== undefined + ? { trigger: builtExecutionData.trigger } + : {}), + ...(builtExecutionData.executionState !== undefined + ? { executionState: builtExecutionData.executionState } + : {}), + ...(builtExecutionData.environment !== undefined + ? { environment: builtExecutionData.environment } + : {}), + ...(builtExecutionData.correlation !== undefined + ? { correlation: builtExecutionData.correlation } + : {}), + }, + { + workflowId: existingLog?.workflowId ?? null, + executionId, + userId: billingUserId, + } + ) const rawDurationMs = isResume && existingLog?.startedAt diff --git a/apps/sim/lib/logs/execution/pii-large-values.test.ts b/apps/sim/lib/logs/execution/pii-large-values.test.ts new file mode 100644 index 00000000000..da83f0258de --- /dev/null +++ b/apps/sim/lib/logs/execution/pii-large-values.test.ts @@ -0,0 +1,97 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockMaterializeRef, mockCompact, mockMaterializeManifest, mockMaskBatch } = vi.hoisted( + () => ({ + mockMaterializeRef: vi.fn(), + mockCompact: vi.fn(), + mockMaterializeManifest: vi.fn(), + mockMaskBatch: vi.fn(), + }) +) + +vi.mock('@/lib/execution/payloads/store', () => ({ + materializeLargeValueRef: mockMaterializeRef, +})) +vi.mock('@/lib/execution/payloads/serializer', () => ({ + compactExecutionPayload: mockCompact, +})) +vi.mock('@/lib/execution/payloads/large-array-manifest', () => ({ + materializeLargeArrayManifest: mockMaterializeManifest, +})) +vi.mock('@/lib/execution/payloads/large-array-manifest-metadata', () => ({ + isLargeArrayManifest: () => false, +})) +vi.mock('@/lib/guardrails/mask-client', () => ({ + maskPIIBatchViaHttp: mockMaskBatch, +})) + +import { redactLargeValueRefs } from '@/lib/logs/execution/pii-large-values' + +const REF = { + __simLargeValueRef: true, + version: 1, + id: 'lv_abcdef123456', + kind: 'object', + size: 9_000_000, +} as const + +const STORE = { workspaceId: 'ws-1', workflowId: 'wf-1', executionId: 'ex-1', userId: 'u-1' } + +describe('redactLargeValueRefs', () => { + beforeEach(() => { + vi.clearAllMocks() + mockMaskBatch.mockImplementation(async (texts: string[]) => texts.map((t) => `MASKED(${t})`)) + // compact echoes its input so we can assert the masked content is what's re-stored. + mockCompact.mockImplementation(async (value: unknown) => value) + }) + + it('hydrates, masks, and re-stores a large-value ref (content preserved, PII masked)', async () => { + mockMaterializeRef.mockResolvedValue({ note: 'contact bob', id: 42 }) + + const result = await redactLargeValueRefs( + { finalOutput: REF }, + { entityTypes: ['PERSON'], language: 'en', store: STORE } + ) + + expect(mockMaterializeRef).toHaveBeenCalledWith( + REF, + expect.objectContaining({ executionId: 'ex-1', trackReference: false }) + ) + expect(result.finalOutput).toEqual({ note: 'MASKED(contact bob)', id: 42 }) + expect(mockCompact).toHaveBeenCalledTimes(1) + }) + + it('falls back to the marker when a ref cannot be materialized', async () => { + mockMaterializeRef.mockResolvedValue(undefined) + const result = await redactLargeValueRefs( + { finalOutput: REF }, + { entityTypes: [], language: 'en', store: STORE } + ) + expect(result.finalOutput).toBe('[REDACTION_FAILED]') + expect(mockCompact).not.toHaveBeenCalled() + }) + + it('falls back to the marker when re-store throws (never leaks)', async () => { + mockMaterializeRef.mockResolvedValue({ note: 'secret@x.com' }) + mockCompact.mockRejectedValueOnce(new Error('s3 down')) + const result = await redactLargeValueRefs( + { finalOutput: REF }, + { entityTypes: [], language: 'en', store: STORE } + ) + expect(result.finalOutput).toBe('[REDACTION_FAILED]') + }) + + it('leaves payloads without refs untouched', async () => { + const payload = { finalOutput: { answer: 'world', count: 5 } } + const result = await redactLargeValueRefs(payload, { + entityTypes: [], + language: 'en', + store: STORE, + }) + expect(result).toEqual(payload) + expect(mockMaterializeRef).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/logs/execution/pii-large-values.ts b/apps/sim/lib/logs/execution/pii-large-values.ts new file mode 100644 index 00000000000..10ed9bca154 --- /dev/null +++ b/apps/sim/lib/logs/execution/pii-large-values.ts @@ -0,0 +1,121 @@ +import { createLogger } from '@sim/logger' +import { getErrorMessage } from '@sim/utils/errors' +import type { LargeArrayManifest } from '@/lib/execution/payloads/large-array-manifest' +import { materializeLargeArrayManifest } from '@/lib/execution/payloads/large-array-manifest' +import { isLargeArrayManifest } from '@/lib/execution/payloads/large-array-manifest-metadata' +import { isLargeValueRef, type LargeValueRef } from '@/lib/execution/payloads/large-value-ref' +import { compactExecutionPayload } from '@/lib/execution/payloads/serializer' +import type { LargeValueStoreContext } from '@/lib/execution/payloads/store' +import { materializeLargeValueRef } from '@/lib/execution/payloads/store' +import { + REDACTION_FAILED_MARKER, + type RedactablePayload, + redactObjectStrings, +} from '@/lib/logs/execution/pii-redaction' + +const logger = createLogger('PiiLargeValues') + +export interface RedactLargeValueRefsOptions { + /** Presidio entity types to mask. Empty = redact all detected PII. */ + entityTypes: string[] + language: string + /** Storage scope for materializing and re-storing the masked values. */ + store: LargeValueStoreContext +} + +/** + * Hydrate, mask, and re-store offloaded large values inside a log payload. + * + * The string redactor can't reach a value already offloaded to large-value + * storage (a >8MB ref) — its bytes live in object storage, not inline. Block + * outputs are masked BEFORE offload only when the block-output stage is on; when + * it's off, the refs in a persisted log point to unredacted bytes. This walks the + * payload and, for each ref / array manifest, materializes it, masks its content, + * and re-stores a fresh masked ref — so the log keeps the redacted content rather + * than losing the whole field. Any failure (materialization unavailable, missing + * storage scope, re-store error) falls back to {@link REDACTION_FAILED_MARKER} so + * raw PII is never left behind. + */ +export async function redactLargeValueRefs( + payload: RedactablePayload, + options: RedactLargeValueRefsOptions +): Promise { + const result: RedactablePayload = { ...payload } + for (const key of Object.keys(payload) as (keyof RedactablePayload)[]) { + if (payload[key] !== undefined) { + result[key] = await redactNode(payload[key], options) + } + } + return result +} + +async function redactNode(node: unknown, options: RedactLargeValueRefsOptions): Promise { + if (isLargeValueRef(node)) { + return redactRef(node, options) + } + if (isLargeArrayManifest(node)) { + return redactManifest(node, options) + } + if (Array.isArray(node)) { + return Promise.all(node.map((item) => redactNode(item, options))) + } + if (node !== null && typeof node === 'object') { + const entries = await Promise.all( + Object.entries(node).map( + async ([key, value]) => [key, await redactNode(value, options)] as const + ) + ) + return Object.fromEntries(entries) + } + return node +} + +/** + * Mask a materialized large value and re-offload it: handle any nested refs + * first, then mask inline strings, then re-store. `redactObjectStrings` skips + * refs, so the nested re-stored refs are left intact while their siblings mask. + */ +async function maskAndReStore( + value: unknown, + options: RedactLargeValueRefsOptions +): Promise { + const nested = await redactNode(value, options) + const masked = await redactObjectStrings(nested, { + entityTypes: options.entityTypes, + language: options.language, + onFailure: 'scrub', + }) + return compactExecutionPayload(masked, { ...options.store, requireDurable: true }) +} + +async function redactRef( + ref: LargeValueRef, + options: RedactLargeValueRefsOptions +): Promise { + try { + const materialized = await materializeLargeValueRef(ref, { + ...options.store, + trackReference: false, + }) + if (materialized === undefined) return REDACTION_FAILED_MARKER + return await maskAndReStore(materialized, options) + } catch (error) { + logger.error('Failed to redact large value ref; scrubbing', { error: getErrorMessage(error) }) + return REDACTION_FAILED_MARKER + } +} + +async function redactManifest( + manifest: LargeArrayManifest, + options: RedactLargeValueRefsOptions +): Promise { + try { + const materialized = await materializeLargeArrayManifest(manifest, { ...options.store }) + return await maskAndReStore(materialized, options) + } catch (error) { + logger.error('Failed to redact large array manifest; scrubbing', { + error: getErrorMessage(error), + }) + return REDACTION_FAILED_MARKER + } +} diff --git a/apps/sim/lib/logs/execution/pii-redaction.test.ts b/apps/sim/lib/logs/execution/pii-redaction.test.ts index 65e163c7564..6a3bfb26402 100644 --- a/apps/sim/lib/logs/execution/pii-redaction.test.ts +++ b/apps/sim/lib/logs/execution/pii-redaction.test.ts @@ -16,7 +16,6 @@ import { REDACTION_FAILED_MARKER, redactObjectStrings, redactPIIFromExecution, - scrubLargeValueRefs, } from '@/lib/logs/execution/pii-redaction' describe('redactPIIFromExecution', () => { @@ -171,29 +170,26 @@ describe('redactObjectStrings', () => { }) }) -describe('scrubLargeValueRefs', () => { - const ref = { - __simLargeValueRef: true, - version: 1, - id: 'lv_abcdef123456', - kind: 'object', - size: 9_000_000, - } - - it('replaces large-value refs with the marker, preserving surrounding structure', () => { - const result = scrubLargeValueRefs({ - traceSpans: [{ blockId: 'b1', status: 'success', output: { big: ref, small: 'hi' } }], - finalOutput: ref, - }) - const span = (result.traceSpans as any[])[0] - expect(span.blockId).toBe('b1') - expect(span.output.big).toBe(REDACTION_FAILED_MARKER) - expect(span.output.small).toBe('hi') - expect(result.finalOutput).toBe(REDACTION_FAILED_MARKER) +describe('transformStrings (via redactObjectStrings) leaves large-value refs intact', () => { + beforeEach(() => { + vi.clearAllMocks() + mockMaskPIIBatch.mockImplementation(async (texts: string[]) => texts.map((t) => `MASKED(${t})`)) }) - it('leaves payloads without refs untouched', () => { - const payload = { finalOutput: { answer: 'world', count: 5 } } - expect(scrubLargeValueRefs(payload)).toEqual(payload) + it('does not recurse into / corrupt a large-value ref while masking siblings', async () => { + const ref = { + __simLargeValueRef: true, + version: 1, + id: 'lv_abcdef123456', + kind: 'object', + size: 9_000_000, + } + const result = (await redactObjectStrings( + { name: 'bob', big: ref }, + { entityTypes: ['PERSON'] } + )) as any + expect(result.name).toBe('MASKED(bob)') + // The ref is left byte-for-byte intact (its key/id are not masked). + expect(result.big).toEqual(ref) }) }) diff --git a/apps/sim/lib/logs/execution/pii-redaction.ts b/apps/sim/lib/logs/execution/pii-redaction.ts index 5fbc90a4589..b0b08ef6230 100644 --- a/apps/sim/lib/logs/execution/pii-redaction.ts +++ b/apps/sim/lib/logs/execution/pii-redaction.ts @@ -93,6 +93,12 @@ function transformStrings(value: unknown, handle: (s: string) => string): unknow if (typeof value === 'string') { return isEligibleString(value) ? handle(value) : value } + // Treat offloaded large-value refs as opaque: masking their internal `key`/`id` + // strings would corrupt the reference. Their content is handled separately + // (hydrate → mask → re-store) before this runs. + if (isLargeValueRef(value) || isLargeArrayManifest(value)) { + return value + } if (Array.isArray(value)) { return value.map((item) => transformStrings(item, handle)) } @@ -137,44 +143,6 @@ function transformUnit( return transformStrings(value, handle) } -/** Replace every offloaded large-value ref / array manifest with `handle()`. */ -function transformRefs(value: unknown, handle: () => unknown): unknown { - if (isLargeValueRef(value) || isLargeArrayManifest(value)) { - return handle() - } - if (Array.isArray(value)) { - return value.map((item) => transformRefs(item, handle)) - } - if (value !== null && typeof value === 'object') { - const out: Record = {} - for (const [key, v] of Object.entries(value)) { - out[key] = transformRefs(v, handle) - } - return out - } - return value -} - -/** - * Replace offloaded large-value references with {@link REDACTION_FAILED_MARKER}. - * - * The string redactor only masks inline content; a value already offloaded to - * large-value storage (>8MB) is an opaque ref it can't reach. Block outputs are - * masked BEFORE offload only when the block-output stage is on — so when that - * stage is off, the refs in a persisted log point to unredacted bytes. Scrubbing - * them keeps raw PII out of the log (the rare huge field loses its content rather - * than leaking; consistent with the over-ceiling scrub behavior). - */ -export function scrubLargeValueRefs(payload: RedactablePayload): RedactablePayload { - const result: RedactablePayload = { ...payload } - for (const key of REDACTABLE_KEYS) { - if (payload[key] !== undefined) { - result[key] = transformRefs(payload[key], () => REDACTION_FAILED_MARKER) - } - } - return result -} - /** * Mask a batch of collected strings via Presidio. On a hard failure or when the * batch exceeds the ceiling, either scrub to {@link REDACTION_FAILED_MARKER} or From f0c71ccd4909f1f64168c48c4a88a002c070e396 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 30 Jun 2026 12:46:14 -0700 Subject: [PATCH 15/15] fix(data-retention): always apply logs policy to large-value refs when logs stage is on --- apps/sim/lib/logs/execution/logger.ts | 35 +++++++++++++-------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 9b9f8dc85a1..000d4a8cafb 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -637,27 +637,26 @@ export class ExecutionLogger implements IExecutionLoggerService { // truth; re-checking the flag/plan here returns false on a transient read and // would silently skip masking, leaking PII (fail-open). Absence of rules // yields the disabled default, so non-PII orgs incur only the lookup. - const resolved = resolveEffectivePiiRedaction({ orgSettings: row.orgSettings, workspaceId }) - const config = resolved.logs + const config = resolveEffectivePiiRedaction({ orgSettings: row.orgSettings, workspaceId }).logs if (!config.enabled) return payload // The string redactor can't reach values already offloaded to large-value - // storage (>8MB refs). Those are masked before offload only when the - // block-output stage is on; when it's off, hydrate → mask → re-store the refs - // so the log keeps redacted content (falling back to a marker only if a ref - // can't be materialized/re-stored). - const working = resolved.blockOutputs.enabled - ? payload - : await redactLargeValueRefs(payload, { - entityTypes: config.entityTypes, - language: config.language, - store: { - workspaceId, - workflowId: storeContext.workflowId ?? undefined, - executionId: storeContext.executionId, - userId: storeContext.userId ?? undefined, - }, - }) + // storage (>8MB refs). Always hydrate → mask → re-store them under the LOGS + // policy, even if the block-output stage already masked before offload: that + // used the block-output entity set, which can differ from the logs set, so + // the log's large values must get the logs policy applied like inline content + // does. Masking is idempotent, so already-masked spans are unaffected; a ref + // that can't be materialized/re-stored falls back to a marker. + const working = await redactLargeValueRefs(payload, { + entityTypes: config.entityTypes, + language: config.language, + store: { + workspaceId, + workflowId: storeContext.workflowId ?? undefined, + executionId: storeContext.executionId, + userId: storeContext.userId ?? undefined, + }, + }) return redactPIIFromExecution(working, { entityTypes: config.entityTypes,