From 774d69884a51fec000de44a95eb25da33d2c343b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Tue, 19 May 2026 22:12:59 +0200 Subject: [PATCH] Wrap stitch queue claim in a transaction to hold row locks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SELECT FOR UPDATE SKIP LOCKED and the subsequent UPDATE of next_stitch_at now run inside a single transaction. Previously the row locks were released after the SELECT auto-committed, allowing another worker to claim the same rows before the UPDATE ran. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .changeset/stitch-claim-transaction.md | 5 ++ .../stitcher/getDeferredStitchableEntities.ts | 73 ++++++++++--------- 2 files changed, 45 insertions(+), 33 deletions(-) create mode 100644 .changeset/stitch-claim-transaction.md diff --git a/.changeset/stitch-claim-transaction.md b/.changeset/stitch-claim-transaction.md new file mode 100644 index 0000000000..54b3cb75c5 --- /dev/null +++ b/.changeset/stitch-claim-transaction.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-catalog-backend': patch +--- + +Fixed a race condition in the stitch queue claim logic where the `SELECT FOR UPDATE SKIP LOCKED` row locks were released before the `next_stitch_at` bump, allowing multiple workers to claim the same entity. Both statements now run inside a single transaction. diff --git a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts index 2d12ee4f6a..0ceee1fb53 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts @@ -51,43 +51,50 @@ export async function getDeferredStitchableEntities(options: { }> > { const { knex, batchSize, stitchTimeout } = options; - - let itemsQuery = knex('stitch_queue').select( - 'entity_ref', - 'next_stitch_at', - 'stitch_ticket', + const useLocking = ['mysql', 'mysql2', 'pg'].includes( + knex.client.config.client, ); - // This avoids duplication of work because of race conditions and is - // also fast because locked rows are ignored rather than blocking. - // It's only available in MySQL and PostgreSQL - if (['mysql', 'mysql2', 'pg'].includes(knex.client.config.client)) { - itemsQuery = itemsQuery.forUpdate().skipLocked(); - } - - const items = await itemsQuery - .where('next_stitch_at', '<=', knex.fn.now()) - .orderBy('next_stitch_at', 'asc') - .limit(batchSize); - - if (!items.length) { - return []; - } - - await knex('stitch_queue') - .whereIn( + // The SELECT FOR UPDATE SKIP LOCKED + UPDATE must run inside a single + // transaction so that the row locks held by FOR UPDATE persist until + // next_stitch_at has been bumped. Without the transaction the locks + // are released after the SELECT auto-commits, and another worker can + // claim the same rows before the UPDATE runs. + return knex.transaction(async tx => { + let itemsQuery = tx('stitch_queue').select( 'entity_ref', - items.map(i => i.entity_ref), - ) - .update({ - next_stitch_at: nowPlus(knex, stitchTimeout), - }); + 'next_stitch_at', + 'stitch_ticket', + ); - return items.map(i => ({ - entityRef: i.entity_ref, - stitchTicket: i.stitch_ticket, - stitchRequestedAt: timestampToDateTime(i.next_stitch_at), - })); + if (useLocking) { + itemsQuery = itemsQuery.forUpdate().skipLocked(); + } + + const items = await itemsQuery + .where('next_stitch_at', '<=', tx.fn.now()) + .orderBy('next_stitch_at', 'asc') + .limit(batchSize); + + if (!items.length) { + return []; + } + + await tx('stitch_queue') + .whereIn( + 'entity_ref', + items.map(i => i.entity_ref), + ) + .update({ + next_stitch_at: nowPlus(tx, stitchTimeout), + }); + + return items.map(i => ({ + entityRef: i.entity_ref, + stitchTicket: i.stitch_ticket, + stitchRequestedAt: timestampToDateTime(i.next_stitch_at), + })); + }); } function nowPlus(knex: Knex, duration: HumanDuration): Knex.Raw {