Merge pull request #34339 from backstage/freben/stitch-queue-no-overlap

fix(catalog-backend): prevent overlapping stitches for the same entity
This commit is contained in:
Fredrik Adelöw
2026-05-26 14:46:59 +02:00
committed by GitHub
7 changed files with 152 additions and 18 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend': patch
---
Improved stitch queue semantics to prevent overlapping stitches for the same entity. New stitch requests that arrive while a stitch is in progress now only update the ticket (not the timestamp), so the in-progress worker is not interrupted. When the worker completes and detects a pending re-stitch, the queue entry becomes immediately eligible for pickup instead of waiting for the timeout period.
@@ -47,27 +47,49 @@ describe.each(databases.eachSupportedId())(
);
}
// Wrong ticket should not delete the row
// Wrong ticket should not delete the row, but should bump
// next_stitch_at to now() so the pending re-stitch is picked up
// immediately
await markDeferredStitchCompleted({
knex,
entityRef: 'k:ns/n',
stitchTicket: 'the-wrong-ticket',
result: 'succeeded',
});
await expect(result()).resolves.toEqual([
const afterWrongTicket = await result();
expect(afterWrongTicket).toEqual([
{
entity_ref: 'k:ns/n',
next_stitch_at: expect.anything(),
stitch_ticket: 'the-ticket',
},
]);
const bumped = new Date(afterWrongTicket[0].next_stitch_at as string);
expect(bumped.getFullYear()).toBeGreaterThan(1971);
// Correct ticket should delete the row
await markDeferredStitchCompleted({
knex,
entityRef: 'k:ns/n',
stitchTicket: 'the-ticket',
result: 'succeeded',
});
await expect(result()).resolves.toEqual([]);
});
it('does not fail when the row is already gone', async () => {
const knex = await databases.init(databaseId);
await applyDatabaseMigrations(knex);
// Calling on a nonexistent row should not throw
await expect(
markDeferredStitchCompleted({
knex,
entityRef: 'k:ns/nonexistent',
stitchTicket: 'any-ticket',
result: 'succeeded',
}),
).resolves.toBeUndefined();
});
},
);
@@ -22,22 +22,54 @@ import { DbStitchQueueRow } from '../../tables';
*
* @remarks
*
* This assumes that the stitching strategy is set to deferred.
* If the ticket still matches, the stitch_queue entry is deleted — no
* further stitching is needed.
*
* The row is only deleted from stitch_queue if the ticket hasn't changed. If
* it has, it means that a new stitch request has been made, and the entity
* should be stitched once more some time in the future - or is indeed already
* being stitched concurrently with ourselves.
* If the ticket changed (a new stitch was requested while this one was
* in progress), the entry is kept and its next_stitch_at is bumped to
* now() so the re-stitch becomes immediately eligible for pickup.
*
* The `result` parameter controls how the bump behaves when the ticket
* doesn't match:
*
* - `'succeeded'`: The worker wrote its result successfully. A ticket
* mismatch means a re-stitch was requested. Bump to now()
* unconditionally — we're done, the next worker should start ASAP.
*
* - `'abandoned'`: The worker's write was blocked by a stale ticket.
* We can't tell whether the ticket changed because of a re-stitch
* request (nobody else is active) or because we timed out and
* another worker claimed the entry. Bump to now() only if the
* timestamp hasn't moved past what we'd have set — i.e. only move
* it earlier, never later. This prevents extending the timeout
* window of an active worker, while still making overdue entries
* eligible immediately.
*/
export async function markDeferredStitchCompleted(option: {
knex: Knex | Knex.Transaction;
entityRef: string;
stitchTicket: string;
result: 'succeeded' | 'abandoned';
}): Promise<void> {
const { knex, entityRef, stitchTicket } = option;
const { knex, entityRef, stitchTicket, result } = option;
await knex<DbStitchQueueRow>('stitch_queue')
const deleted = await knex<DbStitchQueueRow>('stitch_queue')
.where('entity_ref', '=', entityRef)
.andWhere('stitch_ticket', '=', stitchTicket)
.delete();
if (!deleted) {
const update = knex<DbStitchQueueRow>('stitch_queue')
.where('entity_ref', '=', entityRef)
.update({ next_stitch_at: knex.fn.now() });
if (result === 'abandoned') {
// Only move the timestamp earlier, never later — if another
// worker pushed it forward, we don't want to undercut their
// timeout window.
update.where('next_stitch_at', '>', knex.fn.now());
}
await update;
}
}
@@ -292,3 +292,52 @@ it.each(databases.eachSupportedId())(
});
},
);
describe.each(databases.eachSupportedId())(
'stitch queue overlap prevention, %p',
databaseId => {
it('does not overwrite next_stitch_at when a stitch is in progress', async () => {
const knex = await databases.init(databaseId);
await applyDatabaseMigrations(knex);
// Simulate a claimed entry: next_stitch_at far in the future
const futureTimestamp = '2099-01-01T00:00:00.000';
await knex<DbStitchQueueRow>('stitch_queue').insert({
entity_ref: 'k:ns/target',
stitch_ticket: 'in-progress-ticket',
next_stitch_at: futureTimestamp,
});
// A new stitch request comes in while the stitch is in progress
await markForStitching({ knex, entityRefs: ['k:ns/target'] });
const [row] = await knex<DbStitchQueueRow>('stitch_queue').where(
'entity_ref',
'k:ns/target',
);
// Ticket should be updated (new stitch requested)
expect(row.stitch_ticket).not.toBe('in-progress-ticket');
// Timestamp should NOT be yanked back to now — the in-progress
// worker's timeout must be respected
const nextStitch = new Date(row.next_stitch_at as string);
expect(nextStitch.getFullYear()).toBe(2099);
});
it('sets next_stitch_at to now for new entries', async () => {
const knex = await databases.init(databaseId);
await applyDatabaseMigrations(knex);
await markForStitching({ knex, entityRefs: ['k:ns/brand-new'] });
const [row] = await knex<DbStitchQueueRow>('stitch_queue').where(
'entity_ref',
'k:ns/brand-new',
);
expect(row.stitch_ticket).toBeDefined();
const nextStitch = new Date(row.next_stitch_at as string);
expect(nextStitch.getTime()).toBeLessThanOrEqual(Date.now() + 5000);
});
},
);
@@ -27,6 +27,18 @@ const UPDATE_CHUNK_SIZE = 100; // Smaller chunks reduce contention
* future.
*
* @remarks
*
* If there is no existing stitch_queue entry, a new row is created with
* next_stitch_at set to now() (immediately eligible). If an entry already
* exists, only the stitch_ticket is updated — the timestamp is left
* unchanged. This prevents interrupting an in-progress stitch: the worker
* that claimed the entry pushed next_stitch_at forward by the timeout
* duration, and we don't want to yank it back to now() which would allow
* another worker to claim the same entity and cause overlapping stitches.
*
* When the in-progress stitch completes, markDeferredStitchCompleted
* detects the ticket change and makes the entry immediately eligible for
* the follow-up stitch.
*/
export async function markForStitching(options: {
knex: Knex | Knex.Transaction;
@@ -53,7 +65,7 @@ export async function markForStitching(options: {
})),
)
.onConflict('entity_ref')
.merge(['next_stitch_at', 'stitch_ticket']);
.merge(['stitch_ticket']);
}
}, knex);
}
@@ -75,7 +87,7 @@ export async function markForStitching(options: {
})),
)
.onConflict('entity_ref')
.merge(['next_stitch_at', 'stitch_ticket']);
.merge(['stitch_ticket']);
}
}, knex);
}
@@ -56,10 +56,11 @@ export async function performStitching(options: {
}): Promise<'changed' | 'unchanged' | 'abandoned'> {
const { knex, logger, entityRef, stitchTicket } = options;
// The entity is removed from the stitch queue on ANY completion, except when
// an exception is thrown. In the latter case, the entity will be retried at a
// later time.
let removeFromStitchQueueOnCompletion = true;
// The stitch queue is cleaned up on ANY completion — either by deleting
// the entry (ticket unchanged) or bumping next_stitch_at (re-stitch
// requested). Exceptions are the only case where we skip cleanup, so
// the entity gets retried at a later time.
let stitchResult: 'succeeded' | 'abandoned' | undefined;
try {
// Selecting from refresh_state (with an optional left join to
@@ -106,6 +107,7 @@ export async function performStitching(options: {
logger.debug(
`Unable to stitch ${entityRef}, item does not exist in refresh state table`,
);
stitchResult = 'abandoned';
return 'abandoned';
}
@@ -125,6 +127,7 @@ export async function performStitching(options: {
logger.debug(
`Unable to stitch ${entityRef}, the entity has not yet been processed`,
);
stitchResult = 'abandoned';
return 'abandoned';
}
@@ -180,6 +183,7 @@ export async function performStitching(options: {
const hash = generateStableHash(entity);
if (hash === previousHash) {
logger.debug(`Skipped stitching of ${entityRef}, no changes`);
stitchResult = 'succeeded';
return 'unchanged';
}
@@ -213,6 +217,7 @@ export async function performStitching(options: {
logger.debug(
`Entity ${entityRef} is already stitched, skipping write.`,
);
stitchResult = 'abandoned';
return 'abandoned';
}
}
@@ -253,22 +258,24 @@ export async function performStitching(options: {
logger.debug(
`Entity ${entityRef} is already stitched, skipping write.`,
);
stitchResult = 'abandoned';
return 'abandoned';
}
}
await syncSearchRows(knex, entityId, searchEntries);
stitchResult = 'succeeded';
return 'changed';
} catch (error) {
removeFromStitchQueueOnCompletion = false;
throw error;
} finally {
if (removeFromStitchQueueOnCompletion) {
if (stitchResult) {
await markDeferredStitchCompleted({
knex: knex,
entityRef,
stitchTicket,
result: stitchResult,
});
}
}
@@ -209,7 +209,14 @@ describe.each(databases.eachSupportedId())('Stitcher, %p', databaseId => {
await waitForCondition(async () => {
const e = await db<DbFinalEntitiesRow>('final_entities');
return e.length === 1 && e[0].hash !== firstHash;
if (e.length !== 1 || e[0].hash === firstHash) {
return false;
}
const s = await db<DbSearchRow>('search').where(
'original_value',
'k:ns/third',
);
return s.length > 0;
});
await stitcher.stop();