diff --git a/.changeset/redis-worker-oldest-message-age.md b/.changeset/redis-worker-oldest-message-age.md new file mode 100644 index 00000000000..f5f86f76e7f --- /dev/null +++ b/.changeset/redis-worker-oldest-message-age.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/redis-worker": patch +--- + +Add a `redis_worker.queue.oldest_message_age` observable gauge (unit `ms`, labeled `worker_name`) reporting the age of the oldest overdue message in each queue. This is a generic queue-stall signal: it stays at 0 while a queue drains healthily and rises only when due work sits undrained (e.g. a blocked dequeue, a dead consumer, or backpressure), even when no items are being processed. Orphaned queue entries are resolved against the items hash so they don't report a phantom stall. Also exposes `SimpleQueue.oldestMessageAge()`. diff --git a/packages/redis-worker/src/queue.test.ts b/packages/redis-worker/src/queue.test.ts index 6c8b8884072..8d72c2ca0b9 100644 --- a/packages/redis-worker/src/queue.test.ts +++ b/packages/redis-worker/src/queue.test.ts @@ -215,6 +215,74 @@ describe("SimpleQueue", () => { } }); + redisTest("oldestMessageAge", { timeout: 20_000 }, async ({ redisContainer }) => { + const queue = new SimpleQueue({ + name: "test-1", + schema: { + test: z.object({ + value: z.number(), + }), + }, + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // empty queue → 0 + expect(await queue.oldestMessageAge()).toBe(0); + + // only a future-scheduled item → 0 (not yet overdue) + await queue.enqueue({ + id: "future", + job: "test", + item: { value: 1 }, + availableAt: new Date(Date.now() + 60_000), + visibilityTimeoutMs: 2000, + }); + expect(await queue.oldestMessageAge()).toBe(0); + + // an overdue item → age > 0 + await queue.enqueue({ + id: "overdue", + job: "test", + item: { value: 2 }, + availableAt: new Date(Date.now() - 5_000), + visibilityTimeoutMs: 2000, + }); + const age = await queue.oldestMessageAge(); + expect(age).toBeGreaterThanOrEqual(5_000); + expect(age).toBeLessThan(60_000); + + // an orphaned queue entry (no payload in the items hash), older than the + // real overdue item, must be ignored — it can't be dequeued, so it isn't a + // real stall. Age should still reflect the real overdue item (~5s), not the + // orphan's ~999s. + const redisClient = createRedisClient({ + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }); + await redisClient.zadd(`{queue:test-1:}queue`, Date.now() - 999_000, "orphaned-id"); + const ageWithOrphan = await queue.oldestMessageAge(); + expect(ageWithOrphan).toBeGreaterThanOrEqual(5_000); + expect(ageWithOrphan).toBeLessThan(60_000); + + // once dequeued, the item is invisible (future-scored) → back to 0 (the + // orphan is cleaned by the dequeue scan, the real item goes in-flight) + const [first] = await queue.dequeue(2); + expect(first?.id).toBe("overdue"); + expect(await queue.oldestMessageAge()).toBe(0); + + await redisClient.quit(); + } finally { + await queue.close(); + } + }); + redisTest("invisibility timeout", { timeout: 20_000 }, async ({ redisContainer }) => { const queue = new SimpleQueue({ name: "test-1", diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index e4c593a148d..b60c0437dbc 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -304,6 +304,42 @@ export class SimpleQueue { } } + /** + * Age (in ms) of the oldest *overdue* message — the oldest item whose scheduled + * time has already passed (score <= now). Returns 0 when the queue is empty or + * only holds future/delayed or in-flight (future-scored) items. + * + * Resolves the candidate against the `items` hash so orphaned `queue` entries + * (a member whose payload is missing — the same stale state `dequeueItems` + * cleans up) don't report a phantom stall for work that can't be dequeued. The + * Lua scans due items oldest-first and returns the first score whose payload + * still exists. + * + * This is the generic stall signal: it stays at 0 while a queue drains healthily + * and rises only when due work sits undrained (poison block, dead consumer, + * backpressure). + */ + async oldestMessageAge(): Promise { + try { + const now = Date.now(); + // -1 sentinel = nothing due, or every due entry is orphaned. + const score = Number(await this.redis.getOldestDueScore(`queue`, `items`, now)); + + if (!Number.isFinite(score) || score < 0) { + return 0; + } + + return Math.max(0, now - score); + } catch (e) { + this.logger.error(`SimpleQueue ${this.name}.oldestMessageAge(): error getting oldest age`, { + queue: this.name, + error: e, + }); + // Swallow: a transient Redis error must not break observable metric collection. + return 0; + } + } + async getJob(id: string): Promise | null> { const result = await this.redis.getJob(`queue`, `items`, id); @@ -484,6 +520,30 @@ export class SimpleQueue { `, }); + this.redis.defineCommand("getOldestDueScore", { + numberOfKeys: 2, + lua: ` + local queue = KEYS[1] + local items = KEYS[2] + local now = tonumber(ARGV[1]) + + -- Oldest-first scan of due items, bounded so a long prefix of orphans can't + -- make this O(n). Orphans are rare (dequeueItems removes them), so in the + -- common case this returns on the first iteration. Read-only: unlike + -- dequeueItems we don't ZREM orphans here — a metric probe must not mutate. + local result = redis.call('ZRANGEBYSCORE', queue, '-inf', now, 'WITHSCORES', 'LIMIT', 0, 100) + + for i = 1, #result, 2 do + local id = result[i] + if redis.call('HEXISTS', items, id) == 1 then + return result[i + 1] + end + end + + return -1 + `, + }); + this.redis.defineCommand("getJob", { numberOfKeys: 2, lua: ` @@ -695,5 +755,14 @@ declare module "@internal/redis" { id: string, callback?: Callback<[string, string, string] | null> ): Result<[string, string, string] | null, Context>; + + getOldestDueScore( + //keys + queue: string, + items: string, + //args + now: number, + callback?: Callback + ): Result; } } diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 92f1451033e..8e0f6528d06 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -206,6 +206,17 @@ class Worker { concurrencyLimitPendingObservableGauge.addCallback( this.#updateConcurrencyLimitPendingMetric.bind(this) ); + + const oldestMessageAgeObservableGauge = this.meter.createObservableGauge( + "redis_worker.queue.oldest_message_age", + { + description: "Age of the oldest overdue message in the queue", + unit: "ms", + valueType: ValueType.INT, + } + ); + + oldestMessageAgeObservableGauge.addCallback(this.#updateOldestMessageAgeMetric.bind(this)); } async #updateQueueSizeMetric(observableResult: ObservableResult) { @@ -223,6 +234,14 @@ class Worker { }); } + async #updateOldestMessageAgeMetric(observableResult: ObservableResult) { + const oldestMessageAge = await this.queue.oldestMessageAge(); + + observableResult.observe(oldestMessageAge, { + worker_name: this.options.name, + }); + } + async #updateConcurrencyLimitActiveMetric(observableResult: ObservableResult) { for (const [workerId, limiter] of Object.entries(this.limiters)) { observableResult.observe(limiter.activeCount, {