From 774d69884a51fec000de44a95eb25da33d2c343b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Tue, 19 May 2026 22:12:59 +0200 Subject: [PATCH 1/8] Wrap stitch queue claim in a transaction to hold row locks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SELECT FOR UPDATE SKIP LOCKED and the subsequent UPDATE of next_stitch_at now run inside a single transaction. Previously the row locks were released after the SELECT auto-committed, allowing another worker to claim the same rows before the UPDATE ran. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .changeset/stitch-claim-transaction.md | 5 ++ .../stitcher/getDeferredStitchableEntities.ts | 73 ++++++++++--------- 2 files changed, 45 insertions(+), 33 deletions(-) create mode 100644 .changeset/stitch-claim-transaction.md diff --git a/.changeset/stitch-claim-transaction.md b/.changeset/stitch-claim-transaction.md new file mode 100644 index 0000000000..54b3cb75c5 --- /dev/null +++ b/.changeset/stitch-claim-transaction.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-catalog-backend': patch +--- + +Fixed a race condition in the stitch queue claim logic where the `SELECT FOR UPDATE SKIP LOCKED` row locks were released before the `next_stitch_at` bump, allowing multiple workers to claim the same entity. Both statements now run inside a single transaction. diff --git a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts index 2d12ee4f6a..0ceee1fb53 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts @@ -51,43 +51,50 @@ export async function getDeferredStitchableEntities(options: { }> > { const { knex, batchSize, stitchTimeout } = options; - - let itemsQuery = knex('stitch_queue').select( - 'entity_ref', - 'next_stitch_at', - 'stitch_ticket', + const useLocking = ['mysql', 'mysql2', 'pg'].includes( + knex.client.config.client, ); - // This avoids duplication of work because of race conditions and is - // also fast because locked rows are ignored rather than blocking. - // It's only available in MySQL and PostgreSQL - if (['mysql', 'mysql2', 'pg'].includes(knex.client.config.client)) { - itemsQuery = itemsQuery.forUpdate().skipLocked(); - } - - const items = await itemsQuery - .where('next_stitch_at', '<=', knex.fn.now()) - .orderBy('next_stitch_at', 'asc') - .limit(batchSize); - - if (!items.length) { - return []; - } - - await knex('stitch_queue') - .whereIn( + // The SELECT FOR UPDATE SKIP LOCKED + UPDATE must run inside a single + // transaction so that the row locks held by FOR UPDATE persist until + // next_stitch_at has been bumped. Without the transaction the locks + // are released after the SELECT auto-commits, and another worker can + // claim the same rows before the UPDATE runs. + return knex.transaction(async tx => { + let itemsQuery = tx('stitch_queue').select( 'entity_ref', - items.map(i => i.entity_ref), - ) - .update({ - next_stitch_at: nowPlus(knex, stitchTimeout), - }); + 'next_stitch_at', + 'stitch_ticket', + ); - return items.map(i => ({ - entityRef: i.entity_ref, - stitchTicket: i.stitch_ticket, - stitchRequestedAt: timestampToDateTime(i.next_stitch_at), - })); + if (useLocking) { + itemsQuery = itemsQuery.forUpdate().skipLocked(); + } + + const items = await itemsQuery + .where('next_stitch_at', '<=', tx.fn.now()) + .orderBy('next_stitch_at', 'asc') + .limit(batchSize); + + if (!items.length) { + return []; + } + + await tx('stitch_queue') + .whereIn( + 'entity_ref', + items.map(i => i.entity_ref), + ) + .update({ + next_stitch_at: nowPlus(tx, stitchTimeout), + }); + + return items.map(i => ({ + entityRef: i.entity_ref, + stitchTicket: i.stitch_ticket, + stitchRequestedAt: timestampToDateTime(i.next_stitch_at), + })); + }); } function nowPlus(knex: Knex, duration: HumanDuration): Knex.Raw { From d42f1ab7651ca4d373f411c1a97016170e01dc2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Tue, 19 May 2026 22:21:27 +0200 Subject: [PATCH 2/8] Skip wrapping in transaction when caller already provides one MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .../operations/stitcher/getDeferredStitchableEntities.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts index 0ceee1fb53..20c2d863eb 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts @@ -60,7 +60,7 @@ export async function getDeferredStitchableEntities(options: { // next_stitch_at has been bumped. Without the transaction the locks // are released after the SELECT auto-commits, and another worker can // claim the same rows before the UPDATE runs. - return knex.transaction(async tx => { + const run = async (tx: Knex | Knex.Transaction) => { let itemsQuery = tx('stitch_queue').select( 'entity_ref', 'next_stitch_at', @@ -94,7 +94,12 @@ export async function getDeferredStitchableEntities(options: { stitchTicket: i.stitch_ticket, stitchRequestedAt: timestampToDateTime(i.next_stitch_at), })); - }); + }; + + if (knex.isTransaction) { + return run(knex); + } + return knex.transaction(run); } function nowPlus(knex: Knex, duration: HumanDuration): Knex.Raw { From 32548ad82cbc7d99f7e8ea9d4fab3195beb8e88e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Tue, 19 May 2026 22:33:18 +0200 Subject: [PATCH 3/8] Use .modify() for conditional FOR UPDATE SKIP LOCKED MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .../stitcher/getDeferredStitchableEntities.ts | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts index 20c2d863eb..7535d6e5ed 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts @@ -61,20 +61,16 @@ export async function getDeferredStitchableEntities(options: { // are released after the SELECT auto-commits, and another worker can // claim the same rows before the UPDATE runs. const run = async (tx: Knex | Knex.Transaction) => { - let itemsQuery = tx('stitch_queue').select( - 'entity_ref', - 'next_stitch_at', - 'stitch_ticket', - ); - - if (useLocking) { - itemsQuery = itemsQuery.forUpdate().skipLocked(); - } - - const items = await itemsQuery + const items = await tx('stitch_queue') + .select('entity_ref', 'next_stitch_at', 'stitch_ticket') .where('next_stitch_at', '<=', tx.fn.now()) .orderBy('next_stitch_at', 'asc') - .limit(batchSize); + .limit(batchSize) + .modify(qb => { + if (useLocking) { + qb.forUpdate().skipLocked(); + } + }); if (!items.length) { return []; From 6713e9fe7234359df469e3e32e17fc8837afe27e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Wed, 20 May 2026 00:09:16 +0200 Subject: [PATCH 4/8] Apply same transaction fix to getProcessableEntities MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same FOR UPDATE SKIP LOCKED race: the processing loop's claim query had the SELECT and UPDATE as separate auto-committed statements. The existing caller already wraps in a transaction, but this makes the function self-protecting if called without one. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .../src/database/DefaultProcessingDatabase.ts | 90 +++++++++++-------- 1 file changed, 53 insertions(+), 37 deletions(-) diff --git a/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts b/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts index 264c11055d..540e7fea26 100644 --- a/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts +++ b/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts @@ -207,49 +207,65 @@ export class DefaultProcessingDatabase implements ProcessingDatabase { request: { processBatchSize: number }, ): Promise { const knex = maybeTx as Knex.Transaction | Knex; - - let itemsQuery = knex('refresh_state').select([ - 'entity_id', - 'entity_ref', - 'unprocessed_entity', - 'result_hash', - 'cache', - 'errors', - 'location_key', - 'next_update_at', - ]); - - // This avoids duplication of work because of race conditions and is - // also fast because locked rows are ignored rather than blocking. - // It's only available in MySQL and PostgreSQL - if (['mysql', 'mysql2', 'pg'].includes(knex.client.config.client)) { - itemsQuery = itemsQuery.forUpdate().skipLocked(); - } - - const items = await itemsQuery - .where('next_update_at', '<=', knex.fn.now()) - .limit(request.processBatchSize) - .orderBy('next_update_at', 'asc'); + const useLocking = ['mysql', 'mysql2', 'pg'].includes( + knex.client.config.client, + ); const interval = this.options.refreshInterval(); - const nextUpdateAt = (refreshInterval: number) => { - if (knex.client.config.client.includes('sqlite3')) { - return knex.raw(`datetime('now', ?)`, [`${refreshInterval} seconds`]); - } else if (knex.client.config.client.includes('mysql')) { - return knex.raw(`now() + interval ${refreshInterval} second`); + const nextUpdateAt = ( + tx: Knex | Knex.Transaction, + refreshInterval: number, + ) => { + if (tx.client.config.client.includes('sqlite3')) { + return tx.raw(`datetime('now', ?)`, [`${refreshInterval} seconds`]); + } else if (tx.client.config.client.includes('mysql')) { + return tx.raw(`now() + interval ${refreshInterval} second`); } - return knex.raw(`now() + interval '${refreshInterval} seconds'`); + return tx.raw(`now() + interval '${refreshInterval} seconds'`); }; - await knex('refresh_state') - .whereIn( - 'entity_ref', - items.map(i => i.entity_ref), - ) - .update({ - next_update_at: nextUpdateAt(interval), - }); + // The SELECT FOR UPDATE SKIP LOCKED + UPDATE must run inside a + // single transaction so that the row locks persist until + // next_update_at has been bumped. + const run = async (tx: Knex | Knex.Transaction) => { + const items = await tx('refresh_state') + .select([ + 'entity_id', + 'entity_ref', + 'unprocessed_entity', + 'result_hash', + 'cache', + 'errors', + 'location_key', + 'next_update_at', + ]) + .where('next_update_at', '<=', tx.fn.now()) + .limit(request.processBatchSize) + .orderBy('next_update_at', 'asc') + .modify(qb => { + if (useLocking) { + qb.forUpdate().skipLocked(); + } + }); + + if (items.length > 0) { + await tx('refresh_state') + .whereIn( + 'entity_ref', + items.map(i => i.entity_ref), + ) + .update({ + next_update_at: nextUpdateAt(tx, interval), + }); + } + + return items; + }; + + const items = knex.isTransaction + ? await run(knex) + : await knex.transaction(run); return { items: items.map( From afc872aac6bac648863c2f0e8ee762c8d99e595c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Wed, 20 May 2026 08:06:37 +0200 Subject: [PATCH 5/8] Fix implicit any types on transaction callback parameters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .../catalog-backend/src/database/DefaultProcessingDatabase.ts | 4 ++-- .../operations/stitcher/getDeferredStitchableEntities.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts b/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts index 540e7fea26..1c793eb0fc 100644 --- a/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts +++ b/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts @@ -229,7 +229,7 @@ export class DefaultProcessingDatabase implements ProcessingDatabase { // single transaction so that the row locks persist until // next_update_at has been bumped. const run = async (tx: Knex | Knex.Transaction) => { - const items = await tx('refresh_state') + const items: DbRefreshStateRow[] = await tx('refresh_state') .select([ 'entity_id', 'entity_ref', @@ -250,7 +250,7 @@ export class DefaultProcessingDatabase implements ProcessingDatabase { }); if (items.length > 0) { - await tx('refresh_state') + await tx('refresh_state') .whereIn( 'entity_ref', items.map(i => i.entity_ref), diff --git a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts index 7535d6e5ed..4ed41e9308 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts @@ -61,7 +61,7 @@ export async function getDeferredStitchableEntities(options: { // are released after the SELECT auto-commits, and another worker can // claim the same rows before the UPDATE runs. const run = async (tx: Knex | Knex.Transaction) => { - const items = await tx('stitch_queue') + const items: DbStitchQueueRow[] = await tx('stitch_queue') .select('entity_ref', 'next_stitch_at', 'stitch_ticket') .where('next_stitch_at', '<=', tx.fn.now()) .orderBy('next_stitch_at', 'asc') @@ -76,7 +76,7 @@ export async function getDeferredStitchableEntities(options: { return []; } - await tx('stitch_queue') + await tx('stitch_queue') .whereIn( 'entity_ref', items.map(i => i.entity_ref), From 139a784ad4aa722bc23f757a716d614ccb4f1d3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Wed, 20 May 2026 09:44:31 +0200 Subject: [PATCH 6/8] Skip transaction wrapping when locking is not used MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Only wrap the SELECT+UPDATE in a transaction for MySQL/PostgreSQL where FOR UPDATE SKIP LOCKED is actually used. For sqlite3 the transaction adds unnecessary BEGIN/COMMIT overhead. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .../src/database/DefaultProcessingDatabase.ts | 7 ++++--- .../operations/stitcher/getDeferredStitchableEntities.ts | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts b/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts index 1c793eb0fc..bd32582b12 100644 --- a/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts +++ b/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts @@ -263,9 +263,10 @@ export class DefaultProcessingDatabase implements ProcessingDatabase { return items; }; - const items = knex.isTransaction - ? await run(knex) - : await knex.transaction(run); + const items = + knex.isTransaction || !useLocking + ? await run(knex) + : await knex.transaction(run); return { items: items.map( diff --git a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts index 4ed41e9308..0b77167ca3 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts @@ -92,7 +92,7 @@ export async function getDeferredStitchableEntities(options: { })); }; - if (knex.isTransaction) { + if (knex.isTransaction || !useLocking) { return run(knex); } return knex.transaction(run); From dd52890773bf9f94e1afb963297a093e7bd5ac93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Wed, 20 May 2026 09:49:08 +0200 Subject: [PATCH 7/8] Use ternary with await for consistent style and better stack traces MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .../operations/stitcher/getDeferredStitchableEntities.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts index 0b77167ca3..b139d5672e 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts @@ -92,10 +92,9 @@ export async function getDeferredStitchableEntities(options: { })); }; - if (knex.isTransaction || !useLocking) { - return run(knex); - } - return knex.transaction(run); + return knex.isTransaction || !useLocking + ? await run(knex) + : await knex.transaction(run); } function nowPlus(knex: Knex, duration: HumanDuration): Knex.Raw { From 1d10fc41e81ed215eea636b10eba20c41dbac928 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Wed, 20 May 2026 09:59:37 +0200 Subject: [PATCH 8/8] Update changeset and PR description to cover both fix sites MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .changeset/stitch-claim-transaction.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/stitch-claim-transaction.md b/.changeset/stitch-claim-transaction.md index 54b3cb75c5..bd5fe2fbfc 100644 --- a/.changeset/stitch-claim-transaction.md +++ b/.changeset/stitch-claim-transaction.md @@ -2,4 +2,4 @@ '@backstage/plugin-catalog-backend': patch --- -Fixed a race condition in the stitch queue claim logic where the `SELECT FOR UPDATE SKIP LOCKED` row locks were released before the `next_stitch_at` bump, allowing multiple workers to claim the same entity. Both statements now run inside a single transaction. +Fixed a race condition in the stitch queue and entity processing claim logic where `SELECT FOR UPDATE SKIP LOCKED` row locks were released before the subsequent timestamp bump, allowing multiple workers to claim the same rows. Both the select and update now run inside a single transaction for MySQL and PostgreSQL.