Wrap stitch queue claim in a transaction to hold row locks

The SELECT FOR UPDATE SKIP LOCKED and the subsequent UPDATE of
next_stitch_at now run inside a single transaction. Previously the
row locks were released after the SELECT auto-committed, allowing
another worker to claim the same rows before the UPDATE ran.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
Fredrik Adelöw
2026-05-19 22:12:59 +02:00
parent cfb9f07c28
commit 774d69884a
2 changed files with 45 additions and 33 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend': patch
---
Fixed a race condition in the stitch queue claim logic where the `SELECT FOR UPDATE SKIP LOCKED` row locks were released before the `next_stitch_at` bump, allowing multiple workers to claim the same entity. Both statements now run inside a single transaction.
@@ -51,43 +51,50 @@ export async function getDeferredStitchableEntities(options: {
}>
> {
const { knex, batchSize, stitchTimeout } = options;
let itemsQuery = knex<DbStitchQueueRow>('stitch_queue').select(
'entity_ref',
'next_stitch_at',
'stitch_ticket',
const useLocking = ['mysql', 'mysql2', 'pg'].includes(
knex.client.config.client,
);
// This avoids duplication of work because of race conditions and is
// also fast because locked rows are ignored rather than blocking.
// It's only available in MySQL and PostgreSQL
if (['mysql', 'mysql2', 'pg'].includes(knex.client.config.client)) {
itemsQuery = itemsQuery.forUpdate().skipLocked();
}
const items = await itemsQuery
.where('next_stitch_at', '<=', knex.fn.now())
.orderBy('next_stitch_at', 'asc')
.limit(batchSize);
if (!items.length) {
return [];
}
await knex<DbStitchQueueRow>('stitch_queue')
.whereIn(
// The SELECT FOR UPDATE SKIP LOCKED + UPDATE must run inside a single
// transaction so that the row locks held by FOR UPDATE persist until
// next_stitch_at has been bumped. Without the transaction the locks
// are released after the SELECT auto-commits, and another worker can
// claim the same rows before the UPDATE runs.
return knex.transaction(async tx => {
let itemsQuery = tx<DbStitchQueueRow>('stitch_queue').select(
'entity_ref',
items.map(i => i.entity_ref),
)
.update({
next_stitch_at: nowPlus(knex, stitchTimeout),
});
'next_stitch_at',
'stitch_ticket',
);
return items.map(i => ({
entityRef: i.entity_ref,
stitchTicket: i.stitch_ticket,
stitchRequestedAt: timestampToDateTime(i.next_stitch_at),
}));
if (useLocking) {
itemsQuery = itemsQuery.forUpdate().skipLocked();
}
const items = await itemsQuery
.where('next_stitch_at', '<=', tx.fn.now())
.orderBy('next_stitch_at', 'asc')
.limit(batchSize);
if (!items.length) {
return [];
}
await tx<DbStitchQueueRow>('stitch_queue')
.whereIn(
'entity_ref',
items.map(i => i.entity_ref),
)
.update({
next_stitch_at: nowPlus(tx, stitchTimeout),
});
return items.map(i => ({
entityRef: i.entity_ref,
stitchTicket: i.stitch_ticket,
stitchRequestedAt: timestampToDateTime(i.next_stitch_at),
}));
});
}
function nowPlus(knex: Knex, duration: HumanDuration): Knex.Raw {