Merge pull request #34314 from backstage/freben/stitch-claim-transaction

Fix stitch queue race: wrap claim in transaction
This commit is contained in:
Fredrik Adelöw
2026-05-20 10:33:53 +02:00
committed by GitHub
3 changed files with 97 additions and 68 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend': patch
---
Fixed a race condition in the stitch queue and entity processing claim logic where `SELECT FOR UPDATE SKIP LOCKED` row locks were released before the subsequent timestamp bump, allowing multiple workers to claim the same rows. Both the select and update now run inside a single transaction for MySQL and PostgreSQL.
@@ -207,49 +207,66 @@ export class DefaultProcessingDatabase implements ProcessingDatabase {
request: { processBatchSize: number },
): Promise<GetProcessableEntitiesResult> {
const knex = maybeTx as Knex.Transaction | Knex;
let itemsQuery = knex<DbRefreshStateRow>('refresh_state').select([
'entity_id',
'entity_ref',
'unprocessed_entity',
'result_hash',
'cache',
'errors',
'location_key',
'next_update_at',
]);
// This avoids duplication of work because of race conditions and is
// also fast because locked rows are ignored rather than blocking.
// It's only available in MySQL and PostgreSQL
if (['mysql', 'mysql2', 'pg'].includes(knex.client.config.client)) {
itemsQuery = itemsQuery.forUpdate().skipLocked();
}
const items = await itemsQuery
.where('next_update_at', '<=', knex.fn.now())
.limit(request.processBatchSize)
.orderBy('next_update_at', 'asc');
const useLocking = ['mysql', 'mysql2', 'pg'].includes(
knex.client.config.client,
);
const interval = this.options.refreshInterval();
const nextUpdateAt = (refreshInterval: number) => {
if (knex.client.config.client.includes('sqlite3')) {
return knex.raw(`datetime('now', ?)`, [`${refreshInterval} seconds`]);
} else if (knex.client.config.client.includes('mysql')) {
return knex.raw(`now() + interval ${refreshInterval} second`);
const nextUpdateAt = (
tx: Knex | Knex.Transaction,
refreshInterval: number,
) => {
if (tx.client.config.client.includes('sqlite3')) {
return tx.raw(`datetime('now', ?)`, [`${refreshInterval} seconds`]);
} else if (tx.client.config.client.includes('mysql')) {
return tx.raw(`now() + interval ${refreshInterval} second`);
}
return knex.raw(`now() + interval '${refreshInterval} seconds'`);
return tx.raw(`now() + interval '${refreshInterval} seconds'`);
};
await knex<DbRefreshStateRow>('refresh_state')
.whereIn(
'entity_ref',
items.map(i => i.entity_ref),
)
.update({
next_update_at: nextUpdateAt(interval),
});
// The SELECT FOR UPDATE SKIP LOCKED + UPDATE must run inside a
// single transaction so that the row locks persist until
// next_update_at has been bumped.
const run = async (tx: Knex | Knex.Transaction) => {
const items: DbRefreshStateRow[] = await tx('refresh_state')
.select([
'entity_id',
'entity_ref',
'unprocessed_entity',
'result_hash',
'cache',
'errors',
'location_key',
'next_update_at',
])
.where('next_update_at', '<=', tx.fn.now())
.limit(request.processBatchSize)
.orderBy('next_update_at', 'asc')
.modify(qb => {
if (useLocking) {
qb.forUpdate().skipLocked();
}
});
if (items.length > 0) {
await tx('refresh_state')
.whereIn(
'entity_ref',
items.map(i => i.entity_ref),
)
.update({
next_update_at: nextUpdateAt(tx, interval),
});
}
return items;
};
const items =
knex.isTransaction || !useLocking
? await run(knex)
: await knex.transaction(run);
return {
items: items.map(
@@ -51,43 +51,50 @@ export async function getDeferredStitchableEntities(options: {
}>
> {
const { knex, batchSize, stitchTimeout } = options;
let itemsQuery = knex<DbStitchQueueRow>('stitch_queue').select(
'entity_ref',
'next_stitch_at',
'stitch_ticket',
const useLocking = ['mysql', 'mysql2', 'pg'].includes(
knex.client.config.client,
);
// This avoids duplication of work because of race conditions and is
// also fast because locked rows are ignored rather than blocking.
// It's only available in MySQL and PostgreSQL
if (['mysql', 'mysql2', 'pg'].includes(knex.client.config.client)) {
itemsQuery = itemsQuery.forUpdate().skipLocked();
}
// The SELECT FOR UPDATE SKIP LOCKED + UPDATE must run inside a single
// transaction so that the row locks held by FOR UPDATE persist until
// next_stitch_at has been bumped. Without the transaction the locks
// are released after the SELECT auto-commits, and another worker can
// claim the same rows before the UPDATE runs.
const run = async (tx: Knex | Knex.Transaction) => {
const items: DbStitchQueueRow[] = await tx('stitch_queue')
.select('entity_ref', 'next_stitch_at', 'stitch_ticket')
.where('next_stitch_at', '<=', tx.fn.now())
.orderBy('next_stitch_at', 'asc')
.limit(batchSize)
.modify(qb => {
if (useLocking) {
qb.forUpdate().skipLocked();
}
});
const items = await itemsQuery
.where('next_stitch_at', '<=', knex.fn.now())
.orderBy('next_stitch_at', 'asc')
.limit(batchSize);
if (!items.length) {
return [];
}
if (!items.length) {
return [];
}
await tx('stitch_queue')
.whereIn(
'entity_ref',
items.map(i => i.entity_ref),
)
.update({
next_stitch_at: nowPlus(tx, stitchTimeout),
});
await knex<DbStitchQueueRow>('stitch_queue')
.whereIn(
'entity_ref',
items.map(i => i.entity_ref),
)
.update({
next_stitch_at: nowPlus(knex, stitchTimeout),
});
return items.map(i => ({
entityRef: i.entity_ref,
stitchTicket: i.stitch_ticket,
stitchRequestedAt: timestampToDateTime(i.next_stitch_at),
}));
};
return items.map(i => ({
entityRef: i.entity_ref,
stitchTicket: i.stitch_ticket,
stitchRequestedAt: timestampToDateTime(i.next_stitch_at),
}));
return knex.isTransaction || !useLocking
? await run(knex)
: await knex.transaction(run);
}
function nowPlus(knex: Knex, duration: HumanDuration): Knex.Raw {