From 0b8b67767bfc38a52565e7e9593ba3c7ea1f729d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Fri, 22 May 2026 15:00:45 +0200 Subject: [PATCH] fix(catalog-backend): prevent overlapping stitches for the same entity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit markForStitching now only updates the ticket on conflict, leaving next_stitch_at unchanged so an in-progress worker isn't interrupted. markDeferredStitchCompleted bumps next_stitch_at to now() when the ticket changed, so pending re-stitches happen immediately after the current stitch finishes rather than waiting for the timeout. performStitching no longer calls markDeferredStitchCompleted when abandoned due to a stale ticket, since the queue entry belongs to another worker. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .changeset/stitch-queue-no-overlap.md | 5 ++ .../markDeferredStitchCompleted.test.ts | 26 +++++++++- .../stitcher/markDeferredStitchCompleted.ts | 46 ++++++++++++++--- .../stitcher/markForStitching.test.ts | 49 +++++++++++++++++++ .../operations/stitcher/markForStitching.ts | 16 +++++- .../operations/stitcher/performStitching.ts | 19 ++++--- .../src/stitching/DefaultStitcher.test.ts | 9 +++- 7 files changed, 152 insertions(+), 18 deletions(-) create mode 100644 .changeset/stitch-queue-no-overlap.md diff --git a/.changeset/stitch-queue-no-overlap.md b/.changeset/stitch-queue-no-overlap.md new file mode 100644 index 0000000000..ac64ad71fc --- /dev/null +++ b/.changeset/stitch-queue-no-overlap.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-catalog-backend': patch +--- + +Improved stitch queue semantics to prevent overlapping stitches for the same entity. New stitch requests that arrive while a stitch is in progress now only update the ticket (not the timestamp), so the in-progress worker is not interrupted. When the worker completes and detects a pending re-stitch, the queue entry becomes immediately eligible for pickup instead of waiting for the timeout period. diff --git a/plugins/catalog-backend/src/database/operations/stitcher/markDeferredStitchCompleted.test.ts b/plugins/catalog-backend/src/database/operations/stitcher/markDeferredStitchCompleted.test.ts index ca0ac590b4..49cc2eae4d 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/markDeferredStitchCompleted.test.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/markDeferredStitchCompleted.test.ts @@ -47,27 +47,49 @@ describe.each(databases.eachSupportedId())( ); } - // Wrong ticket should not delete the row + // Wrong ticket should not delete the row, but should bump + // next_stitch_at to now() so the pending re-stitch is picked up + // immediately await markDeferredStitchCompleted({ knex, entityRef: 'k:ns/n', stitchTicket: 'the-wrong-ticket', + result: 'succeeded', }); - await expect(result()).resolves.toEqual([ + const afterWrongTicket = await result(); + expect(afterWrongTicket).toEqual([ { entity_ref: 'k:ns/n', next_stitch_at: expect.anything(), stitch_ticket: 'the-ticket', }, ]); + const bumped = new Date(afterWrongTicket[0].next_stitch_at as string); + expect(bumped.getFullYear()).toBeGreaterThan(1971); // Correct ticket should delete the row await markDeferredStitchCompleted({ knex, entityRef: 'k:ns/n', stitchTicket: 'the-ticket', + result: 'succeeded', }); await expect(result()).resolves.toEqual([]); }); + + it('does not fail when the row is already gone', async () => { + const knex = await databases.init(databaseId); + await applyDatabaseMigrations(knex); + + // Calling on a nonexistent row should not throw + await expect( + markDeferredStitchCompleted({ + knex, + entityRef: 'k:ns/nonexistent', + stitchTicket: 'any-ticket', + result: 'succeeded', + }), + ).resolves.toBeUndefined(); + }); }, ); diff --git a/plugins/catalog-backend/src/database/operations/stitcher/markDeferredStitchCompleted.ts b/plugins/catalog-backend/src/database/operations/stitcher/markDeferredStitchCompleted.ts index bb3e2be6f9..04c5f76d5d 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/markDeferredStitchCompleted.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/markDeferredStitchCompleted.ts @@ -22,22 +22,54 @@ import { DbStitchQueueRow } from '../../tables'; * * @remarks * - * This assumes that the stitching strategy is set to deferred. + * If the ticket still matches, the stitch_queue entry is deleted — no + * further stitching is needed. * - * The row is only deleted from stitch_queue if the ticket hasn't changed. If - * it has, it means that a new stitch request has been made, and the entity - * should be stitched once more some time in the future - or is indeed already - * being stitched concurrently with ourselves. + * If the ticket changed (a new stitch was requested while this one was + * in progress), the entry is kept and its next_stitch_at is bumped to + * now() so the re-stitch becomes immediately eligible for pickup. + * + * The `result` parameter controls how the bump behaves when the ticket + * doesn't match: + * + * - `'succeeded'`: The worker wrote its result successfully. A ticket + * mismatch means a re-stitch was requested. Bump to now() + * unconditionally — we're done, the next worker should start ASAP. + * + * - `'abandoned'`: The worker's write was blocked by a stale ticket. + * We can't tell whether the ticket changed because of a re-stitch + * request (nobody else is active) or because we timed out and + * another worker claimed the entry. Bump to now() only if the + * timestamp hasn't moved past what we'd have set — i.e. only move + * it earlier, never later. This prevents extending the timeout + * window of an active worker, while still making overdue entries + * eligible immediately. */ export async function markDeferredStitchCompleted(option: { knex: Knex | Knex.Transaction; entityRef: string; stitchTicket: string; + result: 'succeeded' | 'abandoned'; }): Promise { - const { knex, entityRef, stitchTicket } = option; + const { knex, entityRef, stitchTicket, result } = option; - await knex('stitch_queue') + const deleted = await knex('stitch_queue') .where('entity_ref', '=', entityRef) .andWhere('stitch_ticket', '=', stitchTicket) .delete(); + + if (!deleted) { + const update = knex('stitch_queue') + .where('entity_ref', '=', entityRef) + .update({ next_stitch_at: knex.fn.now() }); + + if (result === 'abandoned') { + // Only move the timestamp earlier, never later — if another + // worker pushed it forward, we don't want to undercut their + // timeout window. + update.where('next_stitch_at', '>', knex.fn.now()); + } + + await update; + } } diff --git a/plugins/catalog-backend/src/database/operations/stitcher/markForStitching.test.ts b/plugins/catalog-backend/src/database/operations/stitcher/markForStitching.test.ts index e3650edf03..c6427b34a6 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/markForStitching.test.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/markForStitching.test.ts @@ -292,3 +292,52 @@ it.each(databases.eachSupportedId())( }); }, ); + +describe.each(databases.eachSupportedId())( + 'stitch queue overlap prevention, %p', + databaseId => { + it('does not overwrite next_stitch_at when a stitch is in progress', async () => { + const knex = await databases.init(databaseId); + await applyDatabaseMigrations(knex); + + // Simulate a claimed entry: next_stitch_at far in the future + const futureTimestamp = '2099-01-01T00:00:00.000'; + await knex('stitch_queue').insert({ + entity_ref: 'k:ns/target', + stitch_ticket: 'in-progress-ticket', + next_stitch_at: futureTimestamp, + }); + + // A new stitch request comes in while the stitch is in progress + await markForStitching({ knex, entityRefs: ['k:ns/target'] }); + + const [row] = await knex('stitch_queue').where( + 'entity_ref', + 'k:ns/target', + ); + + // Ticket should be updated (new stitch requested) + expect(row.stitch_ticket).not.toBe('in-progress-ticket'); + // Timestamp should NOT be yanked back to now — the in-progress + // worker's timeout must be respected + const nextStitch = new Date(row.next_stitch_at as string); + expect(nextStitch.getFullYear()).toBe(2099); + }); + + it('sets next_stitch_at to now for new entries', async () => { + const knex = await databases.init(databaseId); + await applyDatabaseMigrations(knex); + + await markForStitching({ knex, entityRefs: ['k:ns/brand-new'] }); + + const [row] = await knex('stitch_queue').where( + 'entity_ref', + 'k:ns/brand-new', + ); + + expect(row.stitch_ticket).toBeDefined(); + const nextStitch = new Date(row.next_stitch_at as string); + expect(nextStitch.getTime()).toBeLessThanOrEqual(Date.now() + 5000); + }); + }, +); diff --git a/plugins/catalog-backend/src/database/operations/stitcher/markForStitching.ts b/plugins/catalog-backend/src/database/operations/stitcher/markForStitching.ts index d3c6bf3a5d..319ac87334 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/markForStitching.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/markForStitching.ts @@ -27,6 +27,18 @@ const UPDATE_CHUNK_SIZE = 100; // Smaller chunks reduce contention * future. * * @remarks + * + * If there is no existing stitch_queue entry, a new row is created with + * next_stitch_at set to now() (immediately eligible). If an entry already + * exists, only the stitch_ticket is updated — the timestamp is left + * unchanged. This prevents interrupting an in-progress stitch: the worker + * that claimed the entry pushed next_stitch_at forward by the timeout + * duration, and we don't want to yank it back to now() which would allow + * another worker to claim the same entity and cause overlapping stitches. + * + * When the in-progress stitch completes, markDeferredStitchCompleted + * detects the ticket change and makes the entry immediately eligible for + * the follow-up stitch. */ export async function markForStitching(options: { knex: Knex | Knex.Transaction; @@ -53,7 +65,7 @@ export async function markForStitching(options: { })), ) .onConflict('entity_ref') - .merge(['next_stitch_at', 'stitch_ticket']); + .merge(['stitch_ticket']); } }, knex); } @@ -75,7 +87,7 @@ export async function markForStitching(options: { })), ) .onConflict('entity_ref') - .merge(['next_stitch_at', 'stitch_ticket']); + .merge(['stitch_ticket']); } }, knex); } diff --git a/plugins/catalog-backend/src/database/operations/stitcher/performStitching.ts b/plugins/catalog-backend/src/database/operations/stitcher/performStitching.ts index 061a196d8a..3c5067e4d3 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/performStitching.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/performStitching.ts @@ -56,10 +56,11 @@ export async function performStitching(options: { }): Promise<'changed' | 'unchanged' | 'abandoned'> { const { knex, logger, entityRef, stitchTicket } = options; - // The entity is removed from the stitch queue on ANY completion, except when - // an exception is thrown. In the latter case, the entity will be retried at a - // later time. - let removeFromStitchQueueOnCompletion = true; + // The stitch queue is cleaned up on ANY completion — either by deleting + // the entry (ticket unchanged) or bumping next_stitch_at (re-stitch + // requested). Exceptions are the only case where we skip cleanup, so + // the entity gets retried at a later time. + let stitchResult: 'succeeded' | 'abandoned' | undefined; try { // Selecting from refresh_state (with an optional left join to @@ -106,6 +107,7 @@ export async function performStitching(options: { logger.debug( `Unable to stitch ${entityRef}, item does not exist in refresh state table`, ); + stitchResult = 'abandoned'; return 'abandoned'; } @@ -125,6 +127,7 @@ export async function performStitching(options: { logger.debug( `Unable to stitch ${entityRef}, the entity has not yet been processed`, ); + stitchResult = 'abandoned'; return 'abandoned'; } @@ -180,6 +183,7 @@ export async function performStitching(options: { const hash = generateStableHash(entity); if (hash === previousHash) { logger.debug(`Skipped stitching of ${entityRef}, no changes`); + stitchResult = 'succeeded'; return 'unchanged'; } @@ -213,6 +217,7 @@ export async function performStitching(options: { logger.debug( `Entity ${entityRef} is already stitched, skipping write.`, ); + stitchResult = 'abandoned'; return 'abandoned'; } } @@ -253,22 +258,24 @@ export async function performStitching(options: { logger.debug( `Entity ${entityRef} is already stitched, skipping write.`, ); + stitchResult = 'abandoned'; return 'abandoned'; } } await syncSearchRows(knex, entityId, searchEntries); + stitchResult = 'succeeded'; return 'changed'; } catch (error) { - removeFromStitchQueueOnCompletion = false; throw error; } finally { - if (removeFromStitchQueueOnCompletion) { + if (stitchResult) { await markDeferredStitchCompleted({ knex: knex, entityRef, stitchTicket, + result: stitchResult, }); } } diff --git a/plugins/catalog-backend/src/stitching/DefaultStitcher.test.ts b/plugins/catalog-backend/src/stitching/DefaultStitcher.test.ts index 8f383c04fc..16e9e325b6 100644 --- a/plugins/catalog-backend/src/stitching/DefaultStitcher.test.ts +++ b/plugins/catalog-backend/src/stitching/DefaultStitcher.test.ts @@ -209,7 +209,14 @@ describe.each(databases.eachSupportedId())('Stitcher, %p', databaseId => { await waitForCondition(async () => { const e = await db('final_entities'); - return e.length === 1 && e[0].hash !== firstHash; + if (e.length !== 1 || e[0].hash === firstHash) { + return false; + } + const s = await db('search').where( + 'original_value', + 'k:ns/third', + ); + return s.length > 0; }); await stitcher.stop();