Prevent deadlock in catalog deferred stitching

resolves #30843

Signed-off-by: Andreas Berger <andreas@berger-ecommerce.com>
This commit is contained in:
Andreas Berger
2025-09-03 14:53:40 +02:00
parent b9318d4ecf
commit 2204f5b77e
3 changed files with 219 additions and 31 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend': patch
---
Prevent deadlock in catalog deferred stitching
@@ -435,4 +435,143 @@ describe('markForStitching', () => {
}
},
);
const deadlockTestDatabases = TestDatabases.create({
ids: ['POSTGRES_17', 'POSTGRES_16', 'SQLITE_3'],
disableDocker: false,
});
it.each(deadlockTestDatabases.eachSupportedId())(
'reproduces deadlock scenario when concurrent transactions update overlapping entity sets %p',
async databaseId => {
const knex = await deadlockTestDatabases.init(databaseId);
await applyDatabaseMigrations(knex);
// Setup test data with multiple entities
const entityRefs = [
'k:ns/entity-a',
'k:ns/entity-b',
'k:ns/entity-c',
'k:ns/entity-d',
'k:ns/entity-e',
'k:ns/entity-f',
];
await knex<DbRefreshStateRow>('refresh_state').insert(
entityRefs.map((ref, i) => ({
entity_id: `${i + 1}`,
entity_ref: ref,
unprocessed_entity: '{}',
processed_entity: '{}',
errors: '[]',
next_update_at: knex.fn.now(),
last_discovery_at: knex.fn.now(),
next_stitch_at: null,
next_stitch_ticket: null,
})),
);
// This test attempts to reproduce the deadlock by running concurrent transactions
// that update overlapping sets of entities in different orders
const errorResults = [];
for (let attempt = 0; attempt < 10; attempt++) {
// Transaction 1: Update entities A, B, C, D, E
const transaction1 = knex.transaction(async trx => {
await markForStitching({
knex: trx,
strategy: {
mode: 'deferred',
pollingInterval: { seconds: 1 },
stitchTimeout: { seconds: 1 },
},
entityRefs: [
'k:ns/entity-a',
'k:ns/entity-b',
'k:ns/entity-c',
'k:ns/entity-d',
'k:ns/entity-e',
],
});
// Add a small delay to increase chance of collision
await new Promise(resolve => setTimeout(resolve, 10));
await markForStitching({
knex: trx,
strategy: {
mode: 'deferred',
pollingInterval: { seconds: 1 },
stitchTimeout: { seconds: 1 },
},
entityRefs: ['k:ns/entity-f'],
});
});
// Transaction 2: Update entities F, E, D, C, B (reverse order)
const transaction2 = knex.transaction(async trx => {
await markForStitching({
knex: trx,
strategy: {
mode: 'deferred',
pollingInterval: { seconds: 1 },
stitchTimeout: { seconds: 1 },
},
entityRefs: [
'k:ns/entity-f',
'k:ns/entity-e',
'k:ns/entity-d',
'k:ns/entity-c',
'k:ns/entity-b',
],
});
// Add a small delay to increase chance of collision
await new Promise(resolve => setTimeout(resolve, 10));
await markForStitching({
knex: trx,
strategy: {
mode: 'deferred',
pollingInterval: { seconds: 1 },
stitchTimeout: { seconds: 1 },
},
entityRefs: ['k:ns/entity-a'],
});
});
// Run both transactions concurrently to create potential deadlock
errorResults.push(
Promise.allSettled([transaction1, transaction2]).then(results =>
results
.filter(r => r.status === 'rejected')
.map(r => (r as PromiseRejectedResult).reason),
),
);
}
const allResults = await Promise.all(errorResults);
const deadlockErrors = allResults
.flat()
.filter(
error =>
error?.code === '40P01' ||
error?.message?.includes('deadlock detected') ||
error?.message?.includes('deadlock'),
);
expect(deadlockErrors.length).toEqual(0);
// Verify final state - all entities should have been marked for stitching
const finalState = await knex<DbRefreshStateRow>('refresh_state')
.select('entity_ref', 'next_stitch_at', 'next_stitch_ticket')
.whereNotNull('next_stitch_at')
.orderBy('entity_ref');
expect(finalState.length).toBeGreaterThan(0);
finalState.forEach(row => {
expect(row.next_stitch_at).not.toBeNull();
expect(row.next_stitch_ticket).not.toBeNull();
});
},
);
});
@@ -20,6 +20,11 @@ import { v4 as uuid } from 'uuid';
import { StitchingStrategy } from '../../../stitching/types';
import { DbFinalEntitiesRow, DbRefreshStateRow } from '../../tables';
const UPDATE_CHUNK_SIZE = 100; // Smaller chunks reduce contention
const DEADLOCK_SQLSTATE = '40P01';
const DEADLOCK_RETRY_ATTEMPTS = 3;
const DEADLOCK_BASE_DELAY_MS = 25;
/**
* Marks a number of entities for stitching some time in the near
* future.
@@ -32,9 +37,9 @@ export async function markForStitching(options: {
entityRefs?: Iterable<string>;
entityIds?: Iterable<string>;
}): Promise<void> {
// Splitting to chunks just to cover pathological cases that upset the db
const entityRefs = split(options.entityRefs);
const entityIds = split(options.entityIds);
// Sort inputs to ensure consistent lock order across concurrent writers
const entityRefs = split(options.entityRefs, true);
const entityIds = split(options.entityIds, true);
const knex = options.knex;
const mode = options.strategy.mode;
@@ -51,13 +56,15 @@ export async function markForStitching(options: {
.select('entity_id')
.whereIn('entity_ref', chunk),
);
await knex
.table<DbRefreshStateRow>('refresh_state')
.update({
result_hash: 'force-stitching',
next_update_at: knex.fn.now(),
})
.whereIn('entity_ref', chunk);
await retryOnDeadlock(async () => {
await knex
.table<DbRefreshStateRow>('refresh_state')
.update({
result_hash: 'force-stitching',
next_update_at: knex.fn.now(),
})
.whereIn('entity_ref', chunk);
});
}
for (const chunk of entityIds) {
@@ -67,44 +74,81 @@ export async function markForStitching(options: {
hash: 'force-stitching',
})
.whereIn('entity_id', chunk);
await knex
.table<DbRefreshStateRow>('refresh_state')
.update({
result_hash: 'force-stitching',
next_update_at: knex.fn.now(),
})
.whereIn('entity_id', chunk);
await retryOnDeadlock(async () => {
await knex
.table<DbRefreshStateRow>('refresh_state')
.update({
result_hash: 'force-stitching',
next_update_at: knex.fn.now(),
})
.whereIn('entity_id', chunk);
});
}
} else if (mode === 'deferred') {
// It's OK that this is shared across refresh state rows; it just needs to
// be uniquely generated for every new stitch request.
const ticket = uuid();
// Update by primary key in deterministic order to avoid deadlocks
for (const chunk of entityRefs) {
await knex<DbRefreshStateRow>('refresh_state')
.update({
next_stitch_at: knex.fn.now(),
next_stitch_ticket: ticket,
})
.whereIn('entity_ref', chunk);
await retryOnDeadlock(async () => {
await knex<DbRefreshStateRow>('refresh_state')
.update({
next_stitch_at: knex.fn.now(),
next_stitch_ticket: ticket,
})
.whereIn('entity_ref', chunk);
});
}
for (const chunk of entityIds) {
await knex<DbRefreshStateRow>('refresh_state')
.update({
next_stitch_at: knex.fn.now(),
next_stitch_ticket: ticket,
})
.whereIn('entity_id', chunk);
// Ensure ids are sorted for deterministic lock order
await retryOnDeadlock(async () => {
await knex<DbRefreshStateRow>('refresh_state')
.update({
next_stitch_at: knex.fn.now(),
next_stitch_ticket: ticket,
})
.whereIn('entity_id', chunk);
});
}
} else {
throw new Error(`Unknown stitching strategy mode ${mode}`);
}
}
function split(input: Iterable<string> | undefined): string[][] {
function split(input: Iterable<string> | undefined, sort = false): string[][] {
if (!input) {
return [];
}
return splitToChunks(Array.isArray(input) ? input : [...input], 200);
const array = Array.isArray(input) ? input.slice() : [...input];
if (sort) {
array.sort();
}
return splitToChunks(array, UPDATE_CHUNK_SIZE);
}
async function retryOnDeadlock<T>(
fn: () => Promise<T>,
retries = DEADLOCK_RETRY_ATTEMPTS,
baseMs = DEADLOCK_BASE_DELAY_MS,
): Promise<T> {
let attempt = 0;
for (;;) {
try {
return await fn();
} catch (e: any) {
if (e?.code === DEADLOCK_SQLSTATE && attempt < retries) {
await sleep(baseMs * Math.pow(2, attempt));
attempt++;
continue;
}
throw e;
}
}
}
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}