catalog: Reduce search table write churn during stitching
Replace the bulk DELETE + INSERT of all search rows with a minimal sync that only touches rows that actually changed: - Postgres: single writable CTE with unnest — DELETE stale rows and INSERT new ones in one atomic statement using snapshot isolation - MySQL: temporary table merge with deadlock retry (errno 1213) - SQLite: unchanged bulk replace (sufficient for dev/test) For a typical user entity with ~200 search rows where one annotation changed, this reduces the operation from ~400 row writes to ~2. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Fredrik Adelöw <freben@spotify.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-backend': patch
|
||||
---
|
||||
|
||||
Reduced search table write churn during stitching by syncing only changed rows instead of doing a full delete and re-insert. On Postgres this uses a single writable CTE, on MySQL a temporary table merge with deadlock retry, and on SQLite the previous bulk replace.
|
||||
@@ -27,12 +27,12 @@ import { StitchingStrategy } from '../../../stitching/types';
|
||||
import {
|
||||
DbFinalEntitiesRow,
|
||||
DbRefreshStateRow,
|
||||
DbSearchRow,
|
||||
DbStitchQueueRow,
|
||||
} from '../../tables';
|
||||
import { buildEntitySearch } from './buildEntitySearch';
|
||||
import { markDeferredStitchCompleted } from './markDeferredStitchCompleted';
|
||||
import { BATCH_SIZE, generateStableHash } from './util';
|
||||
import { syncSearchRows } from './syncSearchRows';
|
||||
import { generateStableHash } from './util';
|
||||
import {
|
||||
LoggerService,
|
||||
isDatabaseConflictError,
|
||||
@@ -256,10 +256,7 @@ export async function performStitching(options: {
|
||||
return 'abandoned';
|
||||
}
|
||||
|
||||
await knex.transaction(async trx => {
|
||||
await trx<DbSearchRow>('search').where({ entity_id: entityId }).delete();
|
||||
await trx.batchInsert('search', searchEntries, BATCH_SIZE);
|
||||
});
|
||||
await syncSearchRows(knex, entityId, searchEntries);
|
||||
|
||||
return 'changed';
|
||||
} catch (error) {
|
||||
|
||||
@@ -0,0 +1,258 @@
|
||||
/*
|
||||
* Copyright 2026 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { TestDatabases } from '@backstage/backend-test-utils';
|
||||
import { Knex } from 'knex';
|
||||
import { applyDatabaseMigrations } from '../../migrations';
|
||||
import { DbSearchRow } from '../../tables';
|
||||
import { syncSearchRows } from './syncSearchRows';
|
||||
|
||||
jest.setTimeout(60_000);
|
||||
|
||||
const databases = TestDatabases.create();
|
||||
|
||||
function row(
|
||||
key: string,
|
||||
value: string | null,
|
||||
originalValue?: string | null,
|
||||
): DbSearchRow {
|
||||
return {
|
||||
entity_id: 'e1',
|
||||
key,
|
||||
value,
|
||||
original_value: originalValue ?? value,
|
||||
};
|
||||
}
|
||||
|
||||
describe.each(databases.eachSupportedId())('syncSearchRows, %p', databaseId => {
|
||||
let knex: Knex;
|
||||
|
||||
async function getSearchRows(): Promise<DbSearchRow[]> {
|
||||
return knex<DbSearchRow>('search')
|
||||
.where({ entity_id: 'e1' })
|
||||
.orderBy('key')
|
||||
.orderBy('value')
|
||||
.select();
|
||||
}
|
||||
|
||||
beforeEach(async () => {
|
||||
knex = await databases.init(databaseId);
|
||||
await applyDatabaseMigrations(knex);
|
||||
|
||||
// Insert a minimal refresh_state + final_entities row so FKs are satisfied
|
||||
await knex('refresh_state').insert({
|
||||
entity_id: 'e1',
|
||||
entity_ref: 'component:default/test',
|
||||
unprocessed_entity: '{}',
|
||||
errors: '[]',
|
||||
next_update_at: knex.fn.now(),
|
||||
last_discovery_at: knex.fn.now(),
|
||||
});
|
||||
await knex('final_entities').insert({
|
||||
entity_id: 'e1',
|
||||
entity_ref: 'component:default/test',
|
||||
hash: '',
|
||||
});
|
||||
});
|
||||
|
||||
it('inserts all rows into an empty table', async () => {
|
||||
const entries = [row('a', 'x'), row('b', 'y'), row('c', null)];
|
||||
|
||||
await syncSearchRows(knex, 'e1', entries);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({ key: 'a', value: 'x' }),
|
||||
expect.objectContaining({ key: 'b', value: 'y' }),
|
||||
expect.objectContaining({ key: 'c', value: null }),
|
||||
]),
|
||||
);
|
||||
expect(rows).toHaveLength(3);
|
||||
});
|
||||
|
||||
it('leaves unchanged rows untouched', async () => {
|
||||
const entries = [row('a', 'x'), row('b', 'y')];
|
||||
|
||||
await syncSearchRows(knex, 'e1', entries);
|
||||
const rowsBefore = await getSearchRows();
|
||||
|
||||
// Sync again with the same data
|
||||
await syncSearchRows(knex, 'e1', entries);
|
||||
const rowsAfter = await getSearchRows();
|
||||
|
||||
expect(rowsAfter).toEqual(rowsBefore);
|
||||
});
|
||||
|
||||
it('adds new rows without removing existing ones', async () => {
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'x'), row('b', 'y')]);
|
||||
await syncSearchRows(knex, 'e1', [
|
||||
row('a', 'x'),
|
||||
row('b', 'y'),
|
||||
row('c', 'z'),
|
||||
]);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toHaveLength(3);
|
||||
expect(rows).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({ key: 'a', value: 'x' }),
|
||||
expect.objectContaining({ key: 'b', value: 'y' }),
|
||||
expect.objectContaining({ key: 'c', value: 'z' }),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('removes stale rows', async () => {
|
||||
await syncSearchRows(knex, 'e1', [
|
||||
row('a', 'x'),
|
||||
row('b', 'y'),
|
||||
row('c', 'z'),
|
||||
]);
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'x')]);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0]).toEqual(expect.objectContaining({ key: 'a', value: 'x' }));
|
||||
});
|
||||
|
||||
it('handles a value change as a remove + add', async () => {
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'old'), row('b', 'keep')]);
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'new'), row('b', 'keep')]);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toHaveLength(2);
|
||||
expect(rows).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({ key: 'a', value: 'new' }),
|
||||
expect.objectContaining({ key: 'b', value: 'keep' }),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('removes all rows when syncing with an empty set', async () => {
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'x'), row('b', 'y')]);
|
||||
await syncSearchRows(knex, 'e1', []);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('handles null values correctly', async () => {
|
||||
await syncSearchRows(knex, 'e1', [row('a', null), row('b', 'y')]);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toHaveLength(2);
|
||||
expect(rows).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({ key: 'a', value: null }),
|
||||
expect.objectContaining({ key: 'b', value: 'y' }),
|
||||
]),
|
||||
);
|
||||
|
||||
// Change null to value
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'v'), row('b', 'y')]);
|
||||
|
||||
const rows2 = await getSearchRows();
|
||||
expect(rows2).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({ key: 'a', value: 'v' }),
|
||||
expect.objectContaining({ key: 'b', value: 'y' }),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('distinguishes rows by original_value', async () => {
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'v', 'V')]);
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'v', 'v')]);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0]).toEqual(
|
||||
expect.objectContaining({
|
||||
key: 'a',
|
||||
value: 'v',
|
||||
original_value: 'v',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('inserts a row when only original_value casing differs from existing', async () => {
|
||||
// Two rows with the same (key, value) but different original_value
|
||||
// casing must coexist — the case-insensitive MySQL collation must not
|
||||
// cause the INSERT to skip the second row.
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'v', 'V')]);
|
||||
await syncSearchRows(knex, 'e1', [row('a', 'v', 'V'), row('a', 'v', 'v')]);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toHaveLength(2);
|
||||
expect(rows).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({ key: 'a', value: 'v', original_value: 'V' }),
|
||||
expect.objectContaining({ key: 'a', value: 'v', original_value: 'v' }),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('handles multiple rows with the same key but different values', async () => {
|
||||
// Simulates array-derived rows like metadata.tags
|
||||
await syncSearchRows(knex, 'e1', [
|
||||
row('metadata.tags', 'java'),
|
||||
row('metadata.tags', 'python'),
|
||||
row('metadata.tags', 'go'),
|
||||
]);
|
||||
|
||||
// Remove one tag, add another
|
||||
await syncSearchRows(knex, 'e1', [
|
||||
row('metadata.tags', 'java'),
|
||||
row('metadata.tags', 'python'),
|
||||
row('metadata.tags', 'rust'),
|
||||
]);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toHaveLength(3);
|
||||
expect(rows.map(r => r.value).sort()).toEqual(['java', 'python', 'rust']);
|
||||
});
|
||||
|
||||
it('simulates the typical steady-state case with one changed row', async () => {
|
||||
// Build a realistic-ish set of search rows
|
||||
const initial = [
|
||||
...Array.from({ length: 50 }, (_, i) => row(`spec.field${i}`, `v${i}`)),
|
||||
row('metadata.name', 'my-entity'),
|
||||
row('metadata.namespace', 'default'),
|
||||
row('relations.ownedby', 'group:default/team-a'),
|
||||
];
|
||||
|
||||
await syncSearchRows(knex, 'e1', initial);
|
||||
expect(await getSearchRows()).toHaveLength(53);
|
||||
|
||||
// Only the relation changed
|
||||
const updated = [
|
||||
...Array.from({ length: 50 }, (_, i) => row(`spec.field${i}`, `v${i}`)),
|
||||
row('metadata.name', 'my-entity'),
|
||||
row('metadata.namespace', 'default'),
|
||||
row('relations.ownedby', 'group:default/team-b'),
|
||||
];
|
||||
|
||||
await syncSearchRows(knex, 'e1', updated);
|
||||
|
||||
const rows = await getSearchRows();
|
||||
expect(rows).toHaveLength(53);
|
||||
expect(rows.find(r => r.key === 'relations.ownedby')).toEqual(
|
||||
expect.objectContaining({ value: 'group:default/team-b' }),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,228 @@
|
||||
/*
|
||||
* Copyright 2026 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Knex } from 'knex';
|
||||
import { DbSearchRow } from '../../tables';
|
||||
import { BATCH_SIZE } from './util';
|
||||
|
||||
// The Postgres sync uses COALESCE(x, NULL_SENTINEL) to allow Postgres to
|
||||
// include nullable columns in the Hash Cond of anti-joins (IS NOT DISTINCT
|
||||
// FROM prevents this). As a consequence, values that are exactly this
|
||||
// sentinel character are not searchable — they would be treated as NULL.
|
||||
// This is the SOH (Start of Heading) control character which does not
|
||||
// appear in real entity metadata.
|
||||
const NULL_SENTINEL = '\x01';
|
||||
|
||||
function filterSentinelValues(entries: DbSearchRow[]): DbSearchRow[] {
|
||||
return entries.filter(
|
||||
r => r.value !== NULL_SENTINEL && r.original_value !== NULL_SENTINEL,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronizes the search table rows for a given entity, applying only the
|
||||
* minimal set of changes needed. Rows that already exist with the correct
|
||||
* values are left untouched, new rows are inserted, and stale rows are
|
||||
* deleted — minimizing write churn, dead tuples, and WAL traffic.
|
||||
*
|
||||
* Uses database-specific strategies:
|
||||
* - Postgres: Single writable CTE with unnest (one round-trip, no DDL)
|
||||
* - MySQL: Temporary table merge (two queries in a transaction)
|
||||
* - SQLite: Simple bulk replace (sufficient for dev/test)
|
||||
*/
|
||||
export async function syncSearchRows(
|
||||
knex: Knex | Knex.Transaction,
|
||||
entityId: string,
|
||||
searchEntries: DbSearchRow[],
|
||||
): Promise<void> {
|
||||
const client = knex.client.config.client;
|
||||
|
||||
if (client === 'pg') {
|
||||
await syncPostgres(knex, entityId, searchEntries);
|
||||
} else if (client.includes('mysql')) {
|
||||
await syncMysql(knex, entityId, searchEntries);
|
||||
} else {
|
||||
await syncBulkReplace(knex, entityId, searchEntries);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Postgres: writable CTE + unnest
|
||||
//
|
||||
// All CTE branches see the same pre-modification snapshot, so the DELETE
|
||||
// and INSERT do not interfere with each other. This is a single atomic
|
||||
// statement — no explicit transaction wrapper needed.
|
||||
//
|
||||
// Nullable columns use COALESCE(x, chr(1)) instead of IS NOT DISTINCT FROM
|
||||
// so that Postgres can include all three columns in the Hash Cond of the
|
||||
// anti-join, rather than pushing nullable comparisons into a Join Filter
|
||||
// that degrades to O(n*m) when many rows share the same key. chr(1) (SOH
|
||||
// control character) is used as the NULL sentinel — it cannot appear in
|
||||
// real entity values since they are human-readable strings.
|
||||
// ---------------------------------------------------------------------------
|
||||
async function syncPostgres(
|
||||
knex: Knex | Knex.Transaction,
|
||||
entityId: string,
|
||||
searchEntries: DbSearchRow[],
|
||||
): Promise<void> {
|
||||
const filtered = filterSentinelValues(searchEntries);
|
||||
const keys = filtered.map(r => r.key);
|
||||
const values = filtered.map(r => r.value);
|
||||
const originalValues = filtered.map(r => r.original_value);
|
||||
|
||||
await knex.raw(
|
||||
`
|
||||
WITH desired AS (
|
||||
SELECT *
|
||||
FROM unnest(?::text[], ?::text[], ?::text[])
|
||||
AS d(key, value, original_value)
|
||||
),
|
||||
deleted AS (
|
||||
DELETE FROM "search" s
|
||||
WHERE s.entity_id = ?
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM desired d
|
||||
WHERE d.key = s.key
|
||||
AND COALESCE(d.value, chr(1)) = COALESCE(s.value, chr(1))
|
||||
AND COALESCE(d.original_value, chr(1)) = COALESCE(s.original_value, chr(1))
|
||||
)
|
||||
)
|
||||
INSERT INTO "search" (entity_id, key, value, original_value)
|
||||
SELECT ?, d.key, d.value, d.original_value
|
||||
FROM desired d
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM "search" s
|
||||
WHERE s.entity_id = ?
|
||||
AND s.key = d.key
|
||||
AND COALESCE(s.value, chr(1)) = COALESCE(d.value, chr(1))
|
||||
AND COALESCE(s.original_value, chr(1)) = COALESCE(d.original_value, chr(1))
|
||||
)
|
||||
`,
|
||||
[keys, values, originalValues, entityId, entityId, entityId],
|
||||
);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// MySQL: temporary table merge with deadlock retry
|
||||
//
|
||||
// MySQL does not support data-modifying CTEs, so we materialize the desired
|
||||
// state into a session-scoped temporary table and then merge it into the
|
||||
// real table with two queries. The temp table is created inside the
|
||||
// transaction to guarantee it exists on the same pooled connection.
|
||||
// CREATE/DROP TEMPORARY TABLE does not cause an implicit commit in MySQL
|
||||
// (unlike regular DDL), so this is transaction-safe.
|
||||
//
|
||||
// InnoDB's next-key (gap) locking can cause deadlocks between concurrent
|
||||
// transactions operating on different entity_ids when their gap locks
|
||||
// overlap on shared index pages. We retry on deadlock (error 1213) since
|
||||
// the operation is idempotent.
|
||||
// ---------------------------------------------------------------------------
|
||||
const MYSQL_DEADLOCK_MAX_RETRIES = 3;
|
||||
|
||||
async function syncMysql(
|
||||
knex: Knex | Knex.Transaction,
|
||||
entityId: string,
|
||||
searchEntries: DbSearchRow[],
|
||||
): Promise<void> {
|
||||
for (let attempt = 1; ; attempt++) {
|
||||
try {
|
||||
await knex.transaction(async trx => {
|
||||
// Create the temp table inside the transaction so it's guaranteed
|
||||
// to be on the same pooled connection as the merge queries.
|
||||
// CREATE TEMPORARY TABLE does not cause an implicit commit in
|
||||
// MySQL (unlike regular CREATE TABLE), so this is safe.
|
||||
await trx.raw(
|
||||
'CREATE TEMPORARY TABLE IF NOT EXISTS `_desired_search` (' +
|
||||
'`key` VARCHAR(255) NOT NULL, ' +
|
||||
'`value` VARCHAR(255) NULL, ' +
|
||||
'`original_value` VARCHAR(255) NULL' +
|
||||
')',
|
||||
);
|
||||
// Clear stale data from any previous call on this connection.
|
||||
// Uses DELETE (DML) instead of TRUNCATE (DDL) to avoid an
|
||||
// implicit commit that would break transaction atomicity.
|
||||
await trx.raw('DELETE FROM `_desired_search`');
|
||||
|
||||
if (searchEntries.length > 0) {
|
||||
await trx.batchInsert(
|
||||
'_desired_search',
|
||||
searchEntries.map(r => ({
|
||||
key: r.key,
|
||||
value: r.value,
|
||||
original_value: r.original_value,
|
||||
})),
|
||||
BATCH_SIZE,
|
||||
);
|
||||
}
|
||||
|
||||
// Delete rows that are no longer in the desired set
|
||||
await trx.raw(
|
||||
'DELETE s FROM `search` s ' +
|
||||
'WHERE s.entity_id = ? ' +
|
||||
'AND NOT EXISTS (' +
|
||||
' SELECT 1 FROM `_desired_search` d' +
|
||||
' WHERE d.`key` = s.`key`' +
|
||||
' AND d.`value` <=> s.`value`' +
|
||||
' AND BINARY d.`original_value` <=> BINARY s.`original_value`' +
|
||||
')',
|
||||
[entityId],
|
||||
);
|
||||
|
||||
// Insert rows that are new in the desired set. The original_value
|
||||
// column preserves the original casing and must be compared with
|
||||
// BINARY to avoid MySQL's default case-insensitive collation
|
||||
// treating e.g. "Team-A" and "team-a" as equal.
|
||||
await trx.raw(
|
||||
'INSERT INTO `search` (entity_id, `key`, `value`, `original_value`) ' +
|
||||
'SELECT ?, d.`key`, d.`value`, d.`original_value` ' +
|
||||
'FROM `_desired_search` d ' +
|
||||
'WHERE NOT EXISTS (' +
|
||||
' SELECT 1 FROM `search` s' +
|
||||
' WHERE s.entity_id = ?' +
|
||||
' AND s.`key` = d.`key`' +
|
||||
' AND s.`value` <=> d.`value`' +
|
||||
' AND BINARY s.`original_value` <=> BINARY d.`original_value`' +
|
||||
')',
|
||||
[entityId, entityId],
|
||||
);
|
||||
});
|
||||
return;
|
||||
} catch (error) {
|
||||
// MySQL error 1213: ER_LOCK_DEADLOCK
|
||||
if (
|
||||
(error as any)?.errno === 1213 &&
|
||||
attempt < MYSQL_DEADLOCK_MAX_RETRIES
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// SQLite (and fallback): bulk replace
|
||||
// ---------------------------------------------------------------------------
|
||||
async function syncBulkReplace(
|
||||
knex: Knex | Knex.Transaction,
|
||||
entityId: string,
|
||||
searchEntries: DbSearchRow[],
|
||||
): Promise<void> {
|
||||
await knex.transaction(async trx => {
|
||||
await trx<DbSearchRow>('search').where({ entity_id: entityId }).delete();
|
||||
await trx.batchInsert('search', searchEntries, BATCH_SIZE);
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user