Prevent deadlock in catalog deferred stitching
resolves #30843 Signed-off-by: Andreas Berger <andreas@berger-ecommerce.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-backend': patch
|
||||
---
|
||||
|
||||
Prevent deadlock in catalog deferred stitching
|
||||
@@ -435,4 +435,143 @@ describe('markForStitching', () => {
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
const deadlockTestDatabases = TestDatabases.create({
|
||||
ids: ['POSTGRES_17', 'POSTGRES_16', 'SQLITE_3'],
|
||||
disableDocker: false,
|
||||
});
|
||||
it.each(deadlockTestDatabases.eachSupportedId())(
|
||||
'reproduces deadlock scenario when concurrent transactions update overlapping entity sets %p',
|
||||
async databaseId => {
|
||||
const knex = await deadlockTestDatabases.init(databaseId);
|
||||
await applyDatabaseMigrations(knex);
|
||||
|
||||
// Setup test data with multiple entities
|
||||
const entityRefs = [
|
||||
'k:ns/entity-a',
|
||||
'k:ns/entity-b',
|
||||
'k:ns/entity-c',
|
||||
'k:ns/entity-d',
|
||||
'k:ns/entity-e',
|
||||
'k:ns/entity-f',
|
||||
];
|
||||
|
||||
await knex<DbRefreshStateRow>('refresh_state').insert(
|
||||
entityRefs.map((ref, i) => ({
|
||||
entity_id: `${i + 1}`,
|
||||
entity_ref: ref,
|
||||
unprocessed_entity: '{}',
|
||||
processed_entity: '{}',
|
||||
errors: '[]',
|
||||
next_update_at: knex.fn.now(),
|
||||
last_discovery_at: knex.fn.now(),
|
||||
next_stitch_at: null,
|
||||
next_stitch_ticket: null,
|
||||
})),
|
||||
);
|
||||
|
||||
// This test attempts to reproduce the deadlock by running concurrent transactions
|
||||
// that update overlapping sets of entities in different orders
|
||||
const errorResults = [];
|
||||
|
||||
for (let attempt = 0; attempt < 10; attempt++) {
|
||||
// Transaction 1: Update entities A, B, C, D, E
|
||||
const transaction1 = knex.transaction(async trx => {
|
||||
await markForStitching({
|
||||
knex: trx,
|
||||
strategy: {
|
||||
mode: 'deferred',
|
||||
pollingInterval: { seconds: 1 },
|
||||
stitchTimeout: { seconds: 1 },
|
||||
},
|
||||
entityRefs: [
|
||||
'k:ns/entity-a',
|
||||
'k:ns/entity-b',
|
||||
'k:ns/entity-c',
|
||||
'k:ns/entity-d',
|
||||
'k:ns/entity-e',
|
||||
],
|
||||
});
|
||||
|
||||
// Add a small delay to increase chance of collision
|
||||
await new Promise(resolve => setTimeout(resolve, 10));
|
||||
|
||||
await markForStitching({
|
||||
knex: trx,
|
||||
strategy: {
|
||||
mode: 'deferred',
|
||||
pollingInterval: { seconds: 1 },
|
||||
stitchTimeout: { seconds: 1 },
|
||||
},
|
||||
entityRefs: ['k:ns/entity-f'],
|
||||
});
|
||||
});
|
||||
|
||||
// Transaction 2: Update entities F, E, D, C, B (reverse order)
|
||||
const transaction2 = knex.transaction(async trx => {
|
||||
await markForStitching({
|
||||
knex: trx,
|
||||
strategy: {
|
||||
mode: 'deferred',
|
||||
pollingInterval: { seconds: 1 },
|
||||
stitchTimeout: { seconds: 1 },
|
||||
},
|
||||
entityRefs: [
|
||||
'k:ns/entity-f',
|
||||
'k:ns/entity-e',
|
||||
'k:ns/entity-d',
|
||||
'k:ns/entity-c',
|
||||
'k:ns/entity-b',
|
||||
],
|
||||
});
|
||||
|
||||
// Add a small delay to increase chance of collision
|
||||
await new Promise(resolve => setTimeout(resolve, 10));
|
||||
|
||||
await markForStitching({
|
||||
knex: trx,
|
||||
strategy: {
|
||||
mode: 'deferred',
|
||||
pollingInterval: { seconds: 1 },
|
||||
stitchTimeout: { seconds: 1 },
|
||||
},
|
||||
entityRefs: ['k:ns/entity-a'],
|
||||
});
|
||||
});
|
||||
|
||||
// Run both transactions concurrently to create potential deadlock
|
||||
errorResults.push(
|
||||
Promise.allSettled([transaction1, transaction2]).then(results =>
|
||||
results
|
||||
.filter(r => r.status === 'rejected')
|
||||
.map(r => (r as PromiseRejectedResult).reason),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
const allResults = await Promise.all(errorResults);
|
||||
|
||||
const deadlockErrors = allResults
|
||||
.flat()
|
||||
.filter(
|
||||
error =>
|
||||
error?.code === '40P01' ||
|
||||
error?.message?.includes('deadlock detected') ||
|
||||
error?.message?.includes('deadlock'),
|
||||
);
|
||||
expect(deadlockErrors.length).toEqual(0);
|
||||
|
||||
// Verify final state - all entities should have been marked for stitching
|
||||
const finalState = await knex<DbRefreshStateRow>('refresh_state')
|
||||
.select('entity_ref', 'next_stitch_at', 'next_stitch_ticket')
|
||||
.whereNotNull('next_stitch_at')
|
||||
.orderBy('entity_ref');
|
||||
|
||||
expect(finalState.length).toBeGreaterThan(0);
|
||||
finalState.forEach(row => {
|
||||
expect(row.next_stitch_at).not.toBeNull();
|
||||
expect(row.next_stitch_ticket).not.toBeNull();
|
||||
});
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -20,6 +20,11 @@ import { v4 as uuid } from 'uuid';
|
||||
import { StitchingStrategy } from '../../../stitching/types';
|
||||
import { DbFinalEntitiesRow, DbRefreshStateRow } from '../../tables';
|
||||
|
||||
const UPDATE_CHUNK_SIZE = 100; // Smaller chunks reduce contention
|
||||
const DEADLOCK_SQLSTATE = '40P01';
|
||||
const DEADLOCK_RETRY_ATTEMPTS = 3;
|
||||
const DEADLOCK_BASE_DELAY_MS = 25;
|
||||
|
||||
/**
|
||||
* Marks a number of entities for stitching some time in the near
|
||||
* future.
|
||||
@@ -32,9 +37,9 @@ export async function markForStitching(options: {
|
||||
entityRefs?: Iterable<string>;
|
||||
entityIds?: Iterable<string>;
|
||||
}): Promise<void> {
|
||||
// Splitting to chunks just to cover pathological cases that upset the db
|
||||
const entityRefs = split(options.entityRefs);
|
||||
const entityIds = split(options.entityIds);
|
||||
// Sort inputs to ensure consistent lock order across concurrent writers
|
||||
const entityRefs = split(options.entityRefs, true);
|
||||
const entityIds = split(options.entityIds, true);
|
||||
const knex = options.knex;
|
||||
const mode = options.strategy.mode;
|
||||
|
||||
@@ -51,13 +56,15 @@ export async function markForStitching(options: {
|
||||
.select('entity_id')
|
||||
.whereIn('entity_ref', chunk),
|
||||
);
|
||||
await knex
|
||||
.table<DbRefreshStateRow>('refresh_state')
|
||||
.update({
|
||||
result_hash: 'force-stitching',
|
||||
next_update_at: knex.fn.now(),
|
||||
})
|
||||
.whereIn('entity_ref', chunk);
|
||||
await retryOnDeadlock(async () => {
|
||||
await knex
|
||||
.table<DbRefreshStateRow>('refresh_state')
|
||||
.update({
|
||||
result_hash: 'force-stitching',
|
||||
next_update_at: knex.fn.now(),
|
||||
})
|
||||
.whereIn('entity_ref', chunk);
|
||||
});
|
||||
}
|
||||
|
||||
for (const chunk of entityIds) {
|
||||
@@ -67,44 +74,81 @@ export async function markForStitching(options: {
|
||||
hash: 'force-stitching',
|
||||
})
|
||||
.whereIn('entity_id', chunk);
|
||||
await knex
|
||||
.table<DbRefreshStateRow>('refresh_state')
|
||||
.update({
|
||||
result_hash: 'force-stitching',
|
||||
next_update_at: knex.fn.now(),
|
||||
})
|
||||
.whereIn('entity_id', chunk);
|
||||
await retryOnDeadlock(async () => {
|
||||
await knex
|
||||
.table<DbRefreshStateRow>('refresh_state')
|
||||
.update({
|
||||
result_hash: 'force-stitching',
|
||||
next_update_at: knex.fn.now(),
|
||||
})
|
||||
.whereIn('entity_id', chunk);
|
||||
});
|
||||
}
|
||||
} else if (mode === 'deferred') {
|
||||
// It's OK that this is shared across refresh state rows; it just needs to
|
||||
// be uniquely generated for every new stitch request.
|
||||
const ticket = uuid();
|
||||
|
||||
// Update by primary key in deterministic order to avoid deadlocks
|
||||
for (const chunk of entityRefs) {
|
||||
await knex<DbRefreshStateRow>('refresh_state')
|
||||
.update({
|
||||
next_stitch_at: knex.fn.now(),
|
||||
next_stitch_ticket: ticket,
|
||||
})
|
||||
.whereIn('entity_ref', chunk);
|
||||
await retryOnDeadlock(async () => {
|
||||
await knex<DbRefreshStateRow>('refresh_state')
|
||||
.update({
|
||||
next_stitch_at: knex.fn.now(),
|
||||
next_stitch_ticket: ticket,
|
||||
})
|
||||
.whereIn('entity_ref', chunk);
|
||||
});
|
||||
}
|
||||
|
||||
for (const chunk of entityIds) {
|
||||
await knex<DbRefreshStateRow>('refresh_state')
|
||||
.update({
|
||||
next_stitch_at: knex.fn.now(),
|
||||
next_stitch_ticket: ticket,
|
||||
})
|
||||
.whereIn('entity_id', chunk);
|
||||
// Ensure ids are sorted for deterministic lock order
|
||||
|
||||
await retryOnDeadlock(async () => {
|
||||
await knex<DbRefreshStateRow>('refresh_state')
|
||||
.update({
|
||||
next_stitch_at: knex.fn.now(),
|
||||
next_stitch_ticket: ticket,
|
||||
})
|
||||
.whereIn('entity_id', chunk);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
throw new Error(`Unknown stitching strategy mode ${mode}`);
|
||||
}
|
||||
}
|
||||
|
||||
function split(input: Iterable<string> | undefined): string[][] {
|
||||
function split(input: Iterable<string> | undefined, sort = false): string[][] {
|
||||
if (!input) {
|
||||
return [];
|
||||
}
|
||||
return splitToChunks(Array.isArray(input) ? input : [...input], 200);
|
||||
const array = Array.isArray(input) ? input.slice() : [...input];
|
||||
if (sort) {
|
||||
array.sort();
|
||||
}
|
||||
return splitToChunks(array, UPDATE_CHUNK_SIZE);
|
||||
}
|
||||
|
||||
async function retryOnDeadlock<T>(
|
||||
fn: () => Promise<T>,
|
||||
retries = DEADLOCK_RETRY_ATTEMPTS,
|
||||
baseMs = DEADLOCK_BASE_DELAY_MS,
|
||||
): Promise<T> {
|
||||
let attempt = 0;
|
||||
for (;;) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (e: any) {
|
||||
if (e?.code === DEADLOCK_SQLSTATE && attempt < retries) {
|
||||
await sleep(baseMs * Math.pow(2, attempt));
|
||||
attempt++;
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user