diff --git a/.changeset/stitch-claim-transaction.md b/.changeset/stitch-claim-transaction.md new file mode 100644 index 0000000000..bd5fe2fbfc --- /dev/null +++ b/.changeset/stitch-claim-transaction.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-catalog-backend': patch +--- + +Fixed a race condition in the stitch queue and entity processing claim logic where `SELECT FOR UPDATE SKIP LOCKED` row locks were released before the subsequent timestamp bump, allowing multiple workers to claim the same rows. Both the select and update now run inside a single transaction for MySQL and PostgreSQL. diff --git a/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts b/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts index 264c11055d..bd32582b12 100644 --- a/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts +++ b/plugins/catalog-backend/src/database/DefaultProcessingDatabase.ts @@ -207,49 +207,66 @@ export class DefaultProcessingDatabase implements ProcessingDatabase { request: { processBatchSize: number }, ): Promise { const knex = maybeTx as Knex.Transaction | Knex; - - let itemsQuery = knex('refresh_state').select([ - 'entity_id', - 'entity_ref', - 'unprocessed_entity', - 'result_hash', - 'cache', - 'errors', - 'location_key', - 'next_update_at', - ]); - - // This avoids duplication of work because of race conditions and is - // also fast because locked rows are ignored rather than blocking. - // It's only available in MySQL and PostgreSQL - if (['mysql', 'mysql2', 'pg'].includes(knex.client.config.client)) { - itemsQuery = itemsQuery.forUpdate().skipLocked(); - } - - const items = await itemsQuery - .where('next_update_at', '<=', knex.fn.now()) - .limit(request.processBatchSize) - .orderBy('next_update_at', 'asc'); + const useLocking = ['mysql', 'mysql2', 'pg'].includes( + knex.client.config.client, + ); const interval = this.options.refreshInterval(); - const nextUpdateAt = (refreshInterval: number) => { - if (knex.client.config.client.includes('sqlite3')) { - return knex.raw(`datetime('now', ?)`, [`${refreshInterval} seconds`]); - } else if (knex.client.config.client.includes('mysql')) { - return knex.raw(`now() + interval ${refreshInterval} second`); + const nextUpdateAt = ( + tx: Knex | Knex.Transaction, + refreshInterval: number, + ) => { + if (tx.client.config.client.includes('sqlite3')) { + return tx.raw(`datetime('now', ?)`, [`${refreshInterval} seconds`]); + } else if (tx.client.config.client.includes('mysql')) { + return tx.raw(`now() + interval ${refreshInterval} second`); } - return knex.raw(`now() + interval '${refreshInterval} seconds'`); + return tx.raw(`now() + interval '${refreshInterval} seconds'`); }; - await knex('refresh_state') - .whereIn( - 'entity_ref', - items.map(i => i.entity_ref), - ) - .update({ - next_update_at: nextUpdateAt(interval), - }); + // The SELECT FOR UPDATE SKIP LOCKED + UPDATE must run inside a + // single transaction so that the row locks persist until + // next_update_at has been bumped. + const run = async (tx: Knex | Knex.Transaction) => { + const items: DbRefreshStateRow[] = await tx('refresh_state') + .select([ + 'entity_id', + 'entity_ref', + 'unprocessed_entity', + 'result_hash', + 'cache', + 'errors', + 'location_key', + 'next_update_at', + ]) + .where('next_update_at', '<=', tx.fn.now()) + .limit(request.processBatchSize) + .orderBy('next_update_at', 'asc') + .modify(qb => { + if (useLocking) { + qb.forUpdate().skipLocked(); + } + }); + + if (items.length > 0) { + await tx('refresh_state') + .whereIn( + 'entity_ref', + items.map(i => i.entity_ref), + ) + .update({ + next_update_at: nextUpdateAt(tx, interval), + }); + } + + return items; + }; + + const items = + knex.isTransaction || !useLocking + ? await run(knex) + : await knex.transaction(run); return { items: items.map( diff --git a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts index 2d12ee4f6a..b139d5672e 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/getDeferredStitchableEntities.ts @@ -51,43 +51,50 @@ export async function getDeferredStitchableEntities(options: { }> > { const { knex, batchSize, stitchTimeout } = options; - - let itemsQuery = knex('stitch_queue').select( - 'entity_ref', - 'next_stitch_at', - 'stitch_ticket', + const useLocking = ['mysql', 'mysql2', 'pg'].includes( + knex.client.config.client, ); - // This avoids duplication of work because of race conditions and is - // also fast because locked rows are ignored rather than blocking. - // It's only available in MySQL and PostgreSQL - if (['mysql', 'mysql2', 'pg'].includes(knex.client.config.client)) { - itemsQuery = itemsQuery.forUpdate().skipLocked(); - } + // The SELECT FOR UPDATE SKIP LOCKED + UPDATE must run inside a single + // transaction so that the row locks held by FOR UPDATE persist until + // next_stitch_at has been bumped. Without the transaction the locks + // are released after the SELECT auto-commits, and another worker can + // claim the same rows before the UPDATE runs. + const run = async (tx: Knex | Knex.Transaction) => { + const items: DbStitchQueueRow[] = await tx('stitch_queue') + .select('entity_ref', 'next_stitch_at', 'stitch_ticket') + .where('next_stitch_at', '<=', tx.fn.now()) + .orderBy('next_stitch_at', 'asc') + .limit(batchSize) + .modify(qb => { + if (useLocking) { + qb.forUpdate().skipLocked(); + } + }); - const items = await itemsQuery - .where('next_stitch_at', '<=', knex.fn.now()) - .orderBy('next_stitch_at', 'asc') - .limit(batchSize); + if (!items.length) { + return []; + } - if (!items.length) { - return []; - } + await tx('stitch_queue') + .whereIn( + 'entity_ref', + items.map(i => i.entity_ref), + ) + .update({ + next_stitch_at: nowPlus(tx, stitchTimeout), + }); - await knex('stitch_queue') - .whereIn( - 'entity_ref', - items.map(i => i.entity_ref), - ) - .update({ - next_stitch_at: nowPlus(knex, stitchTimeout), - }); + return items.map(i => ({ + entityRef: i.entity_ref, + stitchTicket: i.stitch_ticket, + stitchRequestedAt: timestampToDateTime(i.next_stitch_at), + })); + }; - return items.map(i => ({ - entityRef: i.entity_ref, - stitchTicket: i.stitch_ticket, - stitchRequestedAt: timestampToDateTime(i.next_stitch_at), - })); + return knex.isTransaction || !useLocking + ? await run(knex) + : await knex.transaction(run); } function nowPlus(knex: Knex, duration: HumanDuration): Knex.Raw {