From 16270b72f2be59f17f36e186f7653e41cf45ba91 Mon Sep 17 00:00:00 2001 From: Oskar Otwinowski Date: Tue, 30 Jun 2026 14:41:22 +0200 Subject: [PATCH 1/2] feat(redis-worker): add oldest-message-age queue gauge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a `redis_worker.queue.oldest_message_age_ms` observable gauge (labeled `worker_name`) and `SimpleQueue.oldestMessageAge()`, reporting the age of the oldest overdue message in each queue. Generic queue-stall signal: 0 while a queue drains healthily, rising only when due work sits undrained (blocked dequeue, dead consumer, backpressure) — even when no items are being processed. --- .changeset/redis-worker-oldest-message-age.md | 5 ++ packages/redis-worker/src/queue.test.ts | 50 +++++++++++++++++++ packages/redis-worker/src/queue.ts | 38 ++++++++++++++ packages/redis-worker/src/worker.ts | 19 +++++++ 4 files changed, 112 insertions(+) create mode 100644 .changeset/redis-worker-oldest-message-age.md diff --git a/.changeset/redis-worker-oldest-message-age.md b/.changeset/redis-worker-oldest-message-age.md new file mode 100644 index 00000000000..dda0e97a93e --- /dev/null +++ b/.changeset/redis-worker-oldest-message-age.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/redis-worker": patch +--- + +Add a `redis_worker.queue.oldest_message_age_ms` observable gauge (labeled `worker_name`) reporting the age of the oldest overdue message in each queue. This is a generic queue-stall signal: it stays at 0 while a queue drains healthily and rises only when due work sits undrained (e.g. a blocked dequeue, a dead consumer, or backpressure), even when no items are being processed. Also exposes `SimpleQueue.oldestMessageAge()`. diff --git a/packages/redis-worker/src/queue.test.ts b/packages/redis-worker/src/queue.test.ts index 6c8b8884072..2507cfac221 100644 --- a/packages/redis-worker/src/queue.test.ts +++ b/packages/redis-worker/src/queue.test.ts @@ -215,6 +215,56 @@ describe("SimpleQueue", () => { } }); + redisTest("oldestMessageAge", { timeout: 20_000 }, async ({ redisContainer }) => { + const queue = new SimpleQueue({ + name: "test-1", + schema: { + test: z.object({ + value: z.number(), + }), + }, + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // empty queue → 0 + expect(await queue.oldestMessageAge()).toBe(0); + + // only a future-scheduled item → 0 (not yet overdue) + await queue.enqueue({ + id: "future", + job: "test", + item: { value: 1 }, + availableAt: new Date(Date.now() + 60_000), + visibilityTimeoutMs: 2000, + }); + expect(await queue.oldestMessageAge()).toBe(0); + + // an overdue item → age > 0 + await queue.enqueue({ + id: "overdue", + job: "test", + item: { value: 2 }, + availableAt: new Date(Date.now() - 5_000), + visibilityTimeoutMs: 2000, + }); + const age = await queue.oldestMessageAge(); + expect(age).toBeGreaterThanOrEqual(5_000); + + // once dequeued, the item is invisible (future-scored) → back to 0 + const [first] = await queue.dequeue(); + expect(first?.id).toBe("overdue"); + expect(await queue.oldestMessageAge()).toBe(0); + } finally { + await queue.close(); + } + }); + redisTest("invisibility timeout", { timeout: 20_000 }, async ({ redisContainer }) => { const queue = new SimpleQueue({ name: "test-1", diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index e4c593a148d..7ce11c441ae 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -304,6 +304,44 @@ export class SimpleQueue { } } + /** + * Age (in ms) of the oldest *overdue* message — the oldest item whose scheduled + * time has already passed (score <= now). Returns 0 when the queue is empty or + * only holds future/delayed or in-flight (future-scored) items. + * + * This is the generic stall signal: it stays at 0 while a queue drains healthily + * and rises only when due work sits undrained (poison block, dead consumer, + * backpressure). + */ + async oldestMessageAge(): Promise { + try { + const now = Date.now(); + const result = await this.redis.zrangebyscore( + `queue`, + "-inf", + now, + "WITHSCORES", + "LIMIT", + 0, + 1 + ); + + if (!result || result.length < 2) { + return 0; + } + + const score = Number(result[1]); + return Math.max(0, now - score); + } catch (e) { + this.logger.error(`SimpleQueue ${this.name}.oldestMessageAge(): error getting oldest age`, { + queue: this.name, + error: e, + }); + // Swallow: a transient Redis error must not break observable metric collection. + return 0; + } + } + async getJob(id: string): Promise | null> { const result = await this.redis.getJob(`queue`, `items`, id); diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 92f1451033e..99a6fdd9d54 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -206,6 +206,17 @@ class Worker { concurrencyLimitPendingObservableGauge.addCallback( this.#updateConcurrencyLimitPendingMetric.bind(this) ); + + const oldestMessageAgeObservableGauge = this.meter.createObservableGauge( + "redis_worker.queue.oldest_message_age_ms", + { + description: "Age of the oldest overdue message in the queue", + unit: "ms", + valueType: ValueType.INT, + } + ); + + oldestMessageAgeObservableGauge.addCallback(this.#updateOldestMessageAgeMetric.bind(this)); } async #updateQueueSizeMetric(observableResult: ObservableResult) { @@ -223,6 +234,14 @@ class Worker { }); } + async #updateOldestMessageAgeMetric(observableResult: ObservableResult) { + const oldestMessageAge = await this.queue.oldestMessageAge(); + + observableResult.observe(oldestMessageAge, { + worker_name: this.options.name, + }); + } + async #updateConcurrencyLimitActiveMetric(observableResult: ObservableResult) { for (const [workerId, limiter] of Object.entries(this.limiters)) { observableResult.observe(limiter.activeCount, { From bc40ac8f2c7ecd7731a209c3d7ce7b1458c2ca11 Mon Sep 17 00:00:00 2001 From: Oskar Otwinowski Date: Tue, 30 Jun 2026 16:57:33 +0200 Subject: [PATCH 2/2] fix(redis-worker): address review on oldest-message-age gauge - Drop the _ms suffix from the metric name (keep unit "ms") so the OTLP to Prometheus exporter doesn't double-suffix it to _ms_milliseconds. - Resolve the oldest-overdue candidate against the items hash (new getOldestDueScore Lua) so an orphaned queue entry can't report a phantom stall for work that can't be dequeued. --- .changeset/redis-worker-oldest-message-age.md | 2 +- packages/redis-worker/src/queue.test.ts | 22 +++++++- packages/redis-worker/src/queue.ts | 53 +++++++++++++++---- packages/redis-worker/src/worker.ts | 2 +- 4 files changed, 64 insertions(+), 15 deletions(-) diff --git a/.changeset/redis-worker-oldest-message-age.md b/.changeset/redis-worker-oldest-message-age.md index dda0e97a93e..f5f86f76e7f 100644 --- a/.changeset/redis-worker-oldest-message-age.md +++ b/.changeset/redis-worker-oldest-message-age.md @@ -2,4 +2,4 @@ "@trigger.dev/redis-worker": patch --- -Add a `redis_worker.queue.oldest_message_age_ms` observable gauge (labeled `worker_name`) reporting the age of the oldest overdue message in each queue. This is a generic queue-stall signal: it stays at 0 while a queue drains healthily and rises only when due work sits undrained (e.g. a blocked dequeue, a dead consumer, or backpressure), even when no items are being processed. Also exposes `SimpleQueue.oldestMessageAge()`. +Add a `redis_worker.queue.oldest_message_age` observable gauge (unit `ms`, labeled `worker_name`) reporting the age of the oldest overdue message in each queue. This is a generic queue-stall signal: it stays at 0 while a queue drains healthily and rises only when due work sits undrained (e.g. a blocked dequeue, a dead consumer, or backpressure), even when no items are being processed. Orphaned queue entries are resolved against the items hash so they don't report a phantom stall. Also exposes `SimpleQueue.oldestMessageAge()`. diff --git a/packages/redis-worker/src/queue.test.ts b/packages/redis-worker/src/queue.test.ts index 2507cfac221..8d72c2ca0b9 100644 --- a/packages/redis-worker/src/queue.test.ts +++ b/packages/redis-worker/src/queue.test.ts @@ -255,11 +255,29 @@ describe("SimpleQueue", () => { }); const age = await queue.oldestMessageAge(); expect(age).toBeGreaterThanOrEqual(5_000); + expect(age).toBeLessThan(60_000); - // once dequeued, the item is invisible (future-scored) → back to 0 - const [first] = await queue.dequeue(); + // an orphaned queue entry (no payload in the items hash), older than the + // real overdue item, must be ignored — it can't be dequeued, so it isn't a + // real stall. Age should still reflect the real overdue item (~5s), not the + // orphan's ~999s. + const redisClient = createRedisClient({ + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }); + await redisClient.zadd(`{queue:test-1:}queue`, Date.now() - 999_000, "orphaned-id"); + const ageWithOrphan = await queue.oldestMessageAge(); + expect(ageWithOrphan).toBeGreaterThanOrEqual(5_000); + expect(ageWithOrphan).toBeLessThan(60_000); + + // once dequeued, the item is invisible (future-scored) → back to 0 (the + // orphan is cleaned by the dequeue scan, the real item goes in-flight) + const [first] = await queue.dequeue(2); expect(first?.id).toBe("overdue"); expect(await queue.oldestMessageAge()).toBe(0); + + await redisClient.quit(); } finally { await queue.close(); } diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index 7ce11c441ae..b60c0437dbc 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -309,6 +309,12 @@ export class SimpleQueue { * time has already passed (score <= now). Returns 0 when the queue is empty or * only holds future/delayed or in-flight (future-scored) items. * + * Resolves the candidate against the `items` hash so orphaned `queue` entries + * (a member whose payload is missing — the same stale state `dequeueItems` + * cleans up) don't report a phantom stall for work that can't be dequeued. The + * Lua scans due items oldest-first and returns the first score whose payload + * still exists. + * * This is the generic stall signal: it stays at 0 while a queue drains healthily * and rises only when due work sits undrained (poison block, dead consumer, * backpressure). @@ -316,21 +322,13 @@ export class SimpleQueue { async oldestMessageAge(): Promise { try { const now = Date.now(); - const result = await this.redis.zrangebyscore( - `queue`, - "-inf", - now, - "WITHSCORES", - "LIMIT", - 0, - 1 - ); + // -1 sentinel = nothing due, or every due entry is orphaned. + const score = Number(await this.redis.getOldestDueScore(`queue`, `items`, now)); - if (!result || result.length < 2) { + if (!Number.isFinite(score) || score < 0) { return 0; } - const score = Number(result[1]); return Math.max(0, now - score); } catch (e) { this.logger.error(`SimpleQueue ${this.name}.oldestMessageAge(): error getting oldest age`, { @@ -522,6 +520,30 @@ export class SimpleQueue { `, }); + this.redis.defineCommand("getOldestDueScore", { + numberOfKeys: 2, + lua: ` + local queue = KEYS[1] + local items = KEYS[2] + local now = tonumber(ARGV[1]) + + -- Oldest-first scan of due items, bounded so a long prefix of orphans can't + -- make this O(n). Orphans are rare (dequeueItems removes them), so in the + -- common case this returns on the first iteration. Read-only: unlike + -- dequeueItems we don't ZREM orphans here — a metric probe must not mutate. + local result = redis.call('ZRANGEBYSCORE', queue, '-inf', now, 'WITHSCORES', 'LIMIT', 0, 100) + + for i = 1, #result, 2 do + local id = result[i] + if redis.call('HEXISTS', items, id) == 1 then + return result[i + 1] + end + end + + return -1 + `, + }); + this.redis.defineCommand("getJob", { numberOfKeys: 2, lua: ` @@ -733,5 +755,14 @@ declare module "@internal/redis" { id: string, callback?: Callback<[string, string, string] | null> ): Result<[string, string, string] | null, Context>; + + getOldestDueScore( + //keys + queue: string, + items: string, + //args + now: number, + callback?: Callback + ): Result; } } diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 99a6fdd9d54..8e0f6528d06 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -208,7 +208,7 @@ class Worker { ); const oldestMessageAgeObservableGauge = this.meter.createObservableGauge( - "redis_worker.queue.oldest_message_age_ms", + "redis_worker.queue.oldest_message_age", { description: "Age of the oldest overdue message in the queue", unit: "ms",