Wrap stitch queue claim in a transaction to hold row locks
The SELECT FOR UPDATE SKIP LOCKED and the subsequent UPDATE of next_stitch_at now run inside a single transaction. Previously the row locks were released after the SELECT auto-committed, allowing another worker to claim the same rows before the UPDATE ran. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-backend': patch
|
||||
---
|
||||
|
||||
Fixed a race condition in the stitch queue claim logic where the `SELECT FOR UPDATE SKIP LOCKED` row locks were released before the `next_stitch_at` bump, allowing multiple workers to claim the same entity. Both statements now run inside a single transaction.
|
||||
+40
-33
@@ -51,43 +51,50 @@ export async function getDeferredStitchableEntities(options: {
|
||||
}>
|
||||
> {
|
||||
const { knex, batchSize, stitchTimeout } = options;
|
||||
|
||||
let itemsQuery = knex<DbStitchQueueRow>('stitch_queue').select(
|
||||
'entity_ref',
|
||||
'next_stitch_at',
|
||||
'stitch_ticket',
|
||||
const useLocking = ['mysql', 'mysql2', 'pg'].includes(
|
||||
knex.client.config.client,
|
||||
);
|
||||
|
||||
// This avoids duplication of work because of race conditions and is
|
||||
// also fast because locked rows are ignored rather than blocking.
|
||||
// It's only available in MySQL and PostgreSQL
|
||||
if (['mysql', 'mysql2', 'pg'].includes(knex.client.config.client)) {
|
||||
itemsQuery = itemsQuery.forUpdate().skipLocked();
|
||||
}
|
||||
|
||||
const items = await itemsQuery
|
||||
.where('next_stitch_at', '<=', knex.fn.now())
|
||||
.orderBy('next_stitch_at', 'asc')
|
||||
.limit(batchSize);
|
||||
|
||||
if (!items.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
await knex<DbStitchQueueRow>('stitch_queue')
|
||||
.whereIn(
|
||||
// The SELECT FOR UPDATE SKIP LOCKED + UPDATE must run inside a single
|
||||
// transaction so that the row locks held by FOR UPDATE persist until
|
||||
// next_stitch_at has been bumped. Without the transaction the locks
|
||||
// are released after the SELECT auto-commits, and another worker can
|
||||
// claim the same rows before the UPDATE runs.
|
||||
return knex.transaction(async tx => {
|
||||
let itemsQuery = tx<DbStitchQueueRow>('stitch_queue').select(
|
||||
'entity_ref',
|
||||
items.map(i => i.entity_ref),
|
||||
)
|
||||
.update({
|
||||
next_stitch_at: nowPlus(knex, stitchTimeout),
|
||||
});
|
||||
'next_stitch_at',
|
||||
'stitch_ticket',
|
||||
);
|
||||
|
||||
return items.map(i => ({
|
||||
entityRef: i.entity_ref,
|
||||
stitchTicket: i.stitch_ticket,
|
||||
stitchRequestedAt: timestampToDateTime(i.next_stitch_at),
|
||||
}));
|
||||
if (useLocking) {
|
||||
itemsQuery = itemsQuery.forUpdate().skipLocked();
|
||||
}
|
||||
|
||||
const items = await itemsQuery
|
||||
.where('next_stitch_at', '<=', tx.fn.now())
|
||||
.orderBy('next_stitch_at', 'asc')
|
||||
.limit(batchSize);
|
||||
|
||||
if (!items.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
await tx<DbStitchQueueRow>('stitch_queue')
|
||||
.whereIn(
|
||||
'entity_ref',
|
||||
items.map(i => i.entity_ref),
|
||||
)
|
||||
.update({
|
||||
next_stitch_at: nowPlus(tx, stitchTimeout),
|
||||
});
|
||||
|
||||
return items.map(i => ({
|
||||
entityRef: i.entity_ref,
|
||||
stitchTicket: i.stitch_ticket,
|
||||
stitchRequestedAt: timestampToDateTime(i.next_stitch_at),
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
function nowPlus(knex: Knex, duration: HumanDuration): Knex.Raw {
|
||||
|
||||
Reference in New Issue
Block a user