catalog: Reduce search table write churn during stitching

Replace the bulk DELETE + INSERT of all search rows with a minimal sync
that only touches rows that actually changed:

- Postgres: single writable CTE with unnest — DELETE stale rows and
  INSERT new ones in one atomic statement using snapshot isolation
- MySQL: temporary table merge with deadlock retry (errno 1213)
- SQLite: unchanged bulk replace (sufficient for dev/test)

For a typical user entity with ~200 search rows where one annotation
changed, this reduces the operation from ~400 row writes to ~2.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Fredrik Adelöw <freben@spotify.com>
This commit is contained in:
Fredrik Adelöw
2026-03-30 14:02:01 +02:00
parent 0336f92de8
commit 9da73bf599
4 changed files with 494 additions and 6 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend': patch
---
Reduced search table write churn during stitching by syncing only changed rows instead of doing a full delete and re-insert. On Postgres this uses a single writable CTE, on MySQL a temporary table merge with deadlock retry, and on SQLite the previous bulk replace.
@@ -27,12 +27,12 @@ import { StitchingStrategy } from '../../../stitching/types';
import {
DbFinalEntitiesRow,
DbRefreshStateRow,
DbSearchRow,
DbStitchQueueRow,
} from '../../tables';
import { buildEntitySearch } from './buildEntitySearch';
import { markDeferredStitchCompleted } from './markDeferredStitchCompleted';
import { BATCH_SIZE, generateStableHash } from './util';
import { syncSearchRows } from './syncSearchRows';
import { generateStableHash } from './util';
import {
LoggerService,
isDatabaseConflictError,
@@ -256,10 +256,7 @@ export async function performStitching(options: {
return 'abandoned';
}
await knex.transaction(async trx => {
await trx<DbSearchRow>('search').where({ entity_id: entityId }).delete();
await trx.batchInsert('search', searchEntries, BATCH_SIZE);
});
await syncSearchRows(knex, entityId, searchEntries);
return 'changed';
} catch (error) {
@@ -0,0 +1,258 @@
/*
* Copyright 2026 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { TestDatabases } from '@backstage/backend-test-utils';
import { Knex } from 'knex';
import { applyDatabaseMigrations } from '../../migrations';
import { DbSearchRow } from '../../tables';
import { syncSearchRows } from './syncSearchRows';
jest.setTimeout(60_000);
const databases = TestDatabases.create();
function row(
key: string,
value: string | null,
originalValue?: string | null,
): DbSearchRow {
return {
entity_id: 'e1',
key,
value,
original_value: originalValue ?? value,
};
}
describe.each(databases.eachSupportedId())('syncSearchRows, %p', databaseId => {
let knex: Knex;
async function getSearchRows(): Promise<DbSearchRow[]> {
return knex<DbSearchRow>('search')
.where({ entity_id: 'e1' })
.orderBy('key')
.orderBy('value')
.select();
}
beforeEach(async () => {
knex = await databases.init(databaseId);
await applyDatabaseMigrations(knex);
// Insert a minimal refresh_state + final_entities row so FKs are satisfied
await knex('refresh_state').insert({
entity_id: 'e1',
entity_ref: 'component:default/test',
unprocessed_entity: '{}',
errors: '[]',
next_update_at: knex.fn.now(),
last_discovery_at: knex.fn.now(),
});
await knex('final_entities').insert({
entity_id: 'e1',
entity_ref: 'component:default/test',
hash: '',
});
});
it('inserts all rows into an empty table', async () => {
const entries = [row('a', 'x'), row('b', 'y'), row('c', null)];
await syncSearchRows(knex, 'e1', entries);
const rows = await getSearchRows();
expect(rows).toEqual(
expect.arrayContaining([
expect.objectContaining({ key: 'a', value: 'x' }),
expect.objectContaining({ key: 'b', value: 'y' }),
expect.objectContaining({ key: 'c', value: null }),
]),
);
expect(rows).toHaveLength(3);
});
it('leaves unchanged rows untouched', async () => {
const entries = [row('a', 'x'), row('b', 'y')];
await syncSearchRows(knex, 'e1', entries);
const rowsBefore = await getSearchRows();
// Sync again with the same data
await syncSearchRows(knex, 'e1', entries);
const rowsAfter = await getSearchRows();
expect(rowsAfter).toEqual(rowsBefore);
});
it('adds new rows without removing existing ones', async () => {
await syncSearchRows(knex, 'e1', [row('a', 'x'), row('b', 'y')]);
await syncSearchRows(knex, 'e1', [
row('a', 'x'),
row('b', 'y'),
row('c', 'z'),
]);
const rows = await getSearchRows();
expect(rows).toHaveLength(3);
expect(rows).toEqual(
expect.arrayContaining([
expect.objectContaining({ key: 'a', value: 'x' }),
expect.objectContaining({ key: 'b', value: 'y' }),
expect.objectContaining({ key: 'c', value: 'z' }),
]),
);
});
it('removes stale rows', async () => {
await syncSearchRows(knex, 'e1', [
row('a', 'x'),
row('b', 'y'),
row('c', 'z'),
]);
await syncSearchRows(knex, 'e1', [row('a', 'x')]);
const rows = await getSearchRows();
expect(rows).toHaveLength(1);
expect(rows[0]).toEqual(expect.objectContaining({ key: 'a', value: 'x' }));
});
it('handles a value change as a remove + add', async () => {
await syncSearchRows(knex, 'e1', [row('a', 'old'), row('b', 'keep')]);
await syncSearchRows(knex, 'e1', [row('a', 'new'), row('b', 'keep')]);
const rows = await getSearchRows();
expect(rows).toHaveLength(2);
expect(rows).toEqual(
expect.arrayContaining([
expect.objectContaining({ key: 'a', value: 'new' }),
expect.objectContaining({ key: 'b', value: 'keep' }),
]),
);
});
it('removes all rows when syncing with an empty set', async () => {
await syncSearchRows(knex, 'e1', [row('a', 'x'), row('b', 'y')]);
await syncSearchRows(knex, 'e1', []);
const rows = await getSearchRows();
expect(rows).toHaveLength(0);
});
it('handles null values correctly', async () => {
await syncSearchRows(knex, 'e1', [row('a', null), row('b', 'y')]);
const rows = await getSearchRows();
expect(rows).toHaveLength(2);
expect(rows).toEqual(
expect.arrayContaining([
expect.objectContaining({ key: 'a', value: null }),
expect.objectContaining({ key: 'b', value: 'y' }),
]),
);
// Change null to value
await syncSearchRows(knex, 'e1', [row('a', 'v'), row('b', 'y')]);
const rows2 = await getSearchRows();
expect(rows2).toEqual(
expect.arrayContaining([
expect.objectContaining({ key: 'a', value: 'v' }),
expect.objectContaining({ key: 'b', value: 'y' }),
]),
);
});
it('distinguishes rows by original_value', async () => {
await syncSearchRows(knex, 'e1', [row('a', 'v', 'V')]);
await syncSearchRows(knex, 'e1', [row('a', 'v', 'v')]);
const rows = await getSearchRows();
expect(rows).toHaveLength(1);
expect(rows[0]).toEqual(
expect.objectContaining({
key: 'a',
value: 'v',
original_value: 'v',
}),
);
});
it('inserts a row when only original_value casing differs from existing', async () => {
// Two rows with the same (key, value) but different original_value
// casing must coexist — the case-insensitive MySQL collation must not
// cause the INSERT to skip the second row.
await syncSearchRows(knex, 'e1', [row('a', 'v', 'V')]);
await syncSearchRows(knex, 'e1', [row('a', 'v', 'V'), row('a', 'v', 'v')]);
const rows = await getSearchRows();
expect(rows).toHaveLength(2);
expect(rows).toEqual(
expect.arrayContaining([
expect.objectContaining({ key: 'a', value: 'v', original_value: 'V' }),
expect.objectContaining({ key: 'a', value: 'v', original_value: 'v' }),
]),
);
});
it('handles multiple rows with the same key but different values', async () => {
// Simulates array-derived rows like metadata.tags
await syncSearchRows(knex, 'e1', [
row('metadata.tags', 'java'),
row('metadata.tags', 'python'),
row('metadata.tags', 'go'),
]);
// Remove one tag, add another
await syncSearchRows(knex, 'e1', [
row('metadata.tags', 'java'),
row('metadata.tags', 'python'),
row('metadata.tags', 'rust'),
]);
const rows = await getSearchRows();
expect(rows).toHaveLength(3);
expect(rows.map(r => r.value).sort()).toEqual(['java', 'python', 'rust']);
});
it('simulates the typical steady-state case with one changed row', async () => {
// Build a realistic-ish set of search rows
const initial = [
...Array.from({ length: 50 }, (_, i) => row(`spec.field${i}`, `v${i}`)),
row('metadata.name', 'my-entity'),
row('metadata.namespace', 'default'),
row('relations.ownedby', 'group:default/team-a'),
];
await syncSearchRows(knex, 'e1', initial);
expect(await getSearchRows()).toHaveLength(53);
// Only the relation changed
const updated = [
...Array.from({ length: 50 }, (_, i) => row(`spec.field${i}`, `v${i}`)),
row('metadata.name', 'my-entity'),
row('metadata.namespace', 'default'),
row('relations.ownedby', 'group:default/team-b'),
];
await syncSearchRows(knex, 'e1', updated);
const rows = await getSearchRows();
expect(rows).toHaveLength(53);
expect(rows.find(r => r.key === 'relations.ownedby')).toEqual(
expect.objectContaining({ value: 'group:default/team-b' }),
);
});
});
@@ -0,0 +1,228 @@
/*
* Copyright 2026 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Knex } from 'knex';
import { DbSearchRow } from '../../tables';
import { BATCH_SIZE } from './util';
// The Postgres sync uses COALESCE(x, NULL_SENTINEL) to allow Postgres to
// include nullable columns in the Hash Cond of anti-joins (IS NOT DISTINCT
// FROM prevents this). As a consequence, values that are exactly this
// sentinel character are not searchable — they would be treated as NULL.
// This is the SOH (Start of Heading) control character which does not
// appear in real entity metadata.
const NULL_SENTINEL = '\x01';
function filterSentinelValues(entries: DbSearchRow[]): DbSearchRow[] {
return entries.filter(
r => r.value !== NULL_SENTINEL && r.original_value !== NULL_SENTINEL,
);
}
/**
* Synchronizes the search table rows for a given entity, applying only the
* minimal set of changes needed. Rows that already exist with the correct
* values are left untouched, new rows are inserted, and stale rows are
* deleted — minimizing write churn, dead tuples, and WAL traffic.
*
* Uses database-specific strategies:
* - Postgres: Single writable CTE with unnest (one round-trip, no DDL)
* - MySQL: Temporary table merge (two queries in a transaction)
* - SQLite: Simple bulk replace (sufficient for dev/test)
*/
export async function syncSearchRows(
knex: Knex | Knex.Transaction,
entityId: string,
searchEntries: DbSearchRow[],
): Promise<void> {
const client = knex.client.config.client;
if (client === 'pg') {
await syncPostgres(knex, entityId, searchEntries);
} else if (client.includes('mysql')) {
await syncMysql(knex, entityId, searchEntries);
} else {
await syncBulkReplace(knex, entityId, searchEntries);
}
}
// ---------------------------------------------------------------------------
// Postgres: writable CTE + unnest
//
// All CTE branches see the same pre-modification snapshot, so the DELETE
// and INSERT do not interfere with each other. This is a single atomic
// statement — no explicit transaction wrapper needed.
//
// Nullable columns use COALESCE(x, chr(1)) instead of IS NOT DISTINCT FROM
// so that Postgres can include all three columns in the Hash Cond of the
// anti-join, rather than pushing nullable comparisons into a Join Filter
// that degrades to O(n*m) when many rows share the same key. chr(1) (SOH
// control character) is used as the NULL sentinel — it cannot appear in
// real entity values since they are human-readable strings.
// ---------------------------------------------------------------------------
async function syncPostgres(
knex: Knex | Knex.Transaction,
entityId: string,
searchEntries: DbSearchRow[],
): Promise<void> {
const filtered = filterSentinelValues(searchEntries);
const keys = filtered.map(r => r.key);
const values = filtered.map(r => r.value);
const originalValues = filtered.map(r => r.original_value);
await knex.raw(
`
WITH desired AS (
SELECT *
FROM unnest(?::text[], ?::text[], ?::text[])
AS d(key, value, original_value)
),
deleted AS (
DELETE FROM "search" s
WHERE s.entity_id = ?
AND NOT EXISTS (
SELECT 1 FROM desired d
WHERE d.key = s.key
AND COALESCE(d.value, chr(1)) = COALESCE(s.value, chr(1))
AND COALESCE(d.original_value, chr(1)) = COALESCE(s.original_value, chr(1))
)
)
INSERT INTO "search" (entity_id, key, value, original_value)
SELECT ?, d.key, d.value, d.original_value
FROM desired d
WHERE NOT EXISTS (
SELECT 1 FROM "search" s
WHERE s.entity_id = ?
AND s.key = d.key
AND COALESCE(s.value, chr(1)) = COALESCE(d.value, chr(1))
AND COALESCE(s.original_value, chr(1)) = COALESCE(d.original_value, chr(1))
)
`,
[keys, values, originalValues, entityId, entityId, entityId],
);
}
// ---------------------------------------------------------------------------
// MySQL: temporary table merge with deadlock retry
//
// MySQL does not support data-modifying CTEs, so we materialize the desired
// state into a session-scoped temporary table and then merge it into the
// real table with two queries. The temp table is created inside the
// transaction to guarantee it exists on the same pooled connection.
// CREATE/DROP TEMPORARY TABLE does not cause an implicit commit in MySQL
// (unlike regular DDL), so this is transaction-safe.
//
// InnoDB's next-key (gap) locking can cause deadlocks between concurrent
// transactions operating on different entity_ids when their gap locks
// overlap on shared index pages. We retry on deadlock (error 1213) since
// the operation is idempotent.
// ---------------------------------------------------------------------------
const MYSQL_DEADLOCK_MAX_RETRIES = 3;
async function syncMysql(
knex: Knex | Knex.Transaction,
entityId: string,
searchEntries: DbSearchRow[],
): Promise<void> {
for (let attempt = 1; ; attempt++) {
try {
await knex.transaction(async trx => {
// Create the temp table inside the transaction so it's guaranteed
// to be on the same pooled connection as the merge queries.
// CREATE TEMPORARY TABLE does not cause an implicit commit in
// MySQL (unlike regular CREATE TABLE), so this is safe.
await trx.raw(
'CREATE TEMPORARY TABLE IF NOT EXISTS `_desired_search` (' +
'`key` VARCHAR(255) NOT NULL, ' +
'`value` VARCHAR(255) NULL, ' +
'`original_value` VARCHAR(255) NULL' +
')',
);
// Clear stale data from any previous call on this connection.
// Uses DELETE (DML) instead of TRUNCATE (DDL) to avoid an
// implicit commit that would break transaction atomicity.
await trx.raw('DELETE FROM `_desired_search`');
if (searchEntries.length > 0) {
await trx.batchInsert(
'_desired_search',
searchEntries.map(r => ({
key: r.key,
value: r.value,
original_value: r.original_value,
})),
BATCH_SIZE,
);
}
// Delete rows that are no longer in the desired set
await trx.raw(
'DELETE s FROM `search` s ' +
'WHERE s.entity_id = ? ' +
'AND NOT EXISTS (' +
' SELECT 1 FROM `_desired_search` d' +
' WHERE d.`key` = s.`key`' +
' AND d.`value` <=> s.`value`' +
' AND BINARY d.`original_value` <=> BINARY s.`original_value`' +
')',
[entityId],
);
// Insert rows that are new in the desired set. The original_value
// column preserves the original casing and must be compared with
// BINARY to avoid MySQL's default case-insensitive collation
// treating e.g. "Team-A" and "team-a" as equal.
await trx.raw(
'INSERT INTO `search` (entity_id, `key`, `value`, `original_value`) ' +
'SELECT ?, d.`key`, d.`value`, d.`original_value` ' +
'FROM `_desired_search` d ' +
'WHERE NOT EXISTS (' +
' SELECT 1 FROM `search` s' +
' WHERE s.entity_id = ?' +
' AND s.`key` = d.`key`' +
' AND s.`value` <=> d.`value`' +
' AND BINARY s.`original_value` <=> BINARY d.`original_value`' +
')',
[entityId, entityId],
);
});
return;
} catch (error) {
// MySQL error 1213: ER_LOCK_DEADLOCK
if (
(error as any)?.errno === 1213 &&
attempt < MYSQL_DEADLOCK_MAX_RETRIES
) {
continue;
}
throw error;
}
}
}
// ---------------------------------------------------------------------------
// SQLite (and fallback): bulk replace
// ---------------------------------------------------------------------------
async function syncBulkReplace(
knex: Knex | Knex.Transaction,
entityId: string,
searchEntries: DbSearchRow[],
): Promise<void> {
await knex.transaction(async trx => {
await trx<DbSearchRow>('search').where({ entity_id: entityId }).delete();
await trx.batchInsert('search', searchEntries, BATCH_SIZE);
});
}