From 9da73bf599dc6ae34b34431ce83c1b256fceafd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fredrik=20Adel=C3=B6w?= Date: Mon, 30 Mar 2026 14:02:01 +0200 Subject: [PATCH] catalog: Reduce search table write churn during stitching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the bulk DELETE + INSERT of all search rows with a minimal sync that only touches rows that actually changed: - Postgres: single writable CTE with unnest — DELETE stale rows and INSERT new ones in one atomic statement using snapshot isolation - MySQL: temporary table merge with deadlock retry (errno 1213) - SQLite: unchanged bulk replace (sufficient for dev/test) For a typical user entity with ~200 search rows where one annotation changed, this reduces the operation from ~400 row writes to ~2. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Fredrik Adelöw --- .changeset/catalog-search-table-sync.md | 5 + .../operations/stitcher/performStitching.ts | 9 +- .../stitcher/syncSearchRows.test.ts | 258 ++++++++++++++++++ .../operations/stitcher/syncSearchRows.ts | 228 ++++++++++++++++ 4 files changed, 494 insertions(+), 6 deletions(-) create mode 100644 .changeset/catalog-search-table-sync.md create mode 100644 plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.test.ts create mode 100644 plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.ts diff --git a/.changeset/catalog-search-table-sync.md b/.changeset/catalog-search-table-sync.md new file mode 100644 index 0000000000..3a3bcea96a --- /dev/null +++ b/.changeset/catalog-search-table-sync.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-catalog-backend': patch +--- + +Reduced search table write churn during stitching by syncing only changed rows instead of doing a full delete and re-insert. On Postgres this uses a single writable CTE, on MySQL a temporary table merge with deadlock retry, and on SQLite the previous bulk replace. diff --git a/plugins/catalog-backend/src/database/operations/stitcher/performStitching.ts b/plugins/catalog-backend/src/database/operations/stitcher/performStitching.ts index 95fe880eda..a1e08de597 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/performStitching.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/performStitching.ts @@ -27,12 +27,12 @@ import { StitchingStrategy } from '../../../stitching/types'; import { DbFinalEntitiesRow, DbRefreshStateRow, - DbSearchRow, DbStitchQueueRow, } from '../../tables'; import { buildEntitySearch } from './buildEntitySearch'; import { markDeferredStitchCompleted } from './markDeferredStitchCompleted'; -import { BATCH_SIZE, generateStableHash } from './util'; +import { syncSearchRows } from './syncSearchRows'; +import { generateStableHash } from './util'; import { LoggerService, isDatabaseConflictError, @@ -256,10 +256,7 @@ export async function performStitching(options: { return 'abandoned'; } - await knex.transaction(async trx => { - await trx('search').where({ entity_id: entityId }).delete(); - await trx.batchInsert('search', searchEntries, BATCH_SIZE); - }); + await syncSearchRows(knex, entityId, searchEntries); return 'changed'; } catch (error) { diff --git a/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.test.ts b/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.test.ts new file mode 100644 index 0000000000..0168464faa --- /dev/null +++ b/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.test.ts @@ -0,0 +1,258 @@ +/* + * Copyright 2026 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { TestDatabases } from '@backstage/backend-test-utils'; +import { Knex } from 'knex'; +import { applyDatabaseMigrations } from '../../migrations'; +import { DbSearchRow } from '../../tables'; +import { syncSearchRows } from './syncSearchRows'; + +jest.setTimeout(60_000); + +const databases = TestDatabases.create(); + +function row( + key: string, + value: string | null, + originalValue?: string | null, +): DbSearchRow { + return { + entity_id: 'e1', + key, + value, + original_value: originalValue ?? value, + }; +} + +describe.each(databases.eachSupportedId())('syncSearchRows, %p', databaseId => { + let knex: Knex; + + async function getSearchRows(): Promise { + return knex('search') + .where({ entity_id: 'e1' }) + .orderBy('key') + .orderBy('value') + .select(); + } + + beforeEach(async () => { + knex = await databases.init(databaseId); + await applyDatabaseMigrations(knex); + + // Insert a minimal refresh_state + final_entities row so FKs are satisfied + await knex('refresh_state').insert({ + entity_id: 'e1', + entity_ref: 'component:default/test', + unprocessed_entity: '{}', + errors: '[]', + next_update_at: knex.fn.now(), + last_discovery_at: knex.fn.now(), + }); + await knex('final_entities').insert({ + entity_id: 'e1', + entity_ref: 'component:default/test', + hash: '', + }); + }); + + it('inserts all rows into an empty table', async () => { + const entries = [row('a', 'x'), row('b', 'y'), row('c', null)]; + + await syncSearchRows(knex, 'e1', entries); + + const rows = await getSearchRows(); + expect(rows).toEqual( + expect.arrayContaining([ + expect.objectContaining({ key: 'a', value: 'x' }), + expect.objectContaining({ key: 'b', value: 'y' }), + expect.objectContaining({ key: 'c', value: null }), + ]), + ); + expect(rows).toHaveLength(3); + }); + + it('leaves unchanged rows untouched', async () => { + const entries = [row('a', 'x'), row('b', 'y')]; + + await syncSearchRows(knex, 'e1', entries); + const rowsBefore = await getSearchRows(); + + // Sync again with the same data + await syncSearchRows(knex, 'e1', entries); + const rowsAfter = await getSearchRows(); + + expect(rowsAfter).toEqual(rowsBefore); + }); + + it('adds new rows without removing existing ones', async () => { + await syncSearchRows(knex, 'e1', [row('a', 'x'), row('b', 'y')]); + await syncSearchRows(knex, 'e1', [ + row('a', 'x'), + row('b', 'y'), + row('c', 'z'), + ]); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(3); + expect(rows).toEqual( + expect.arrayContaining([ + expect.objectContaining({ key: 'a', value: 'x' }), + expect.objectContaining({ key: 'b', value: 'y' }), + expect.objectContaining({ key: 'c', value: 'z' }), + ]), + ); + }); + + it('removes stale rows', async () => { + await syncSearchRows(knex, 'e1', [ + row('a', 'x'), + row('b', 'y'), + row('c', 'z'), + ]); + await syncSearchRows(knex, 'e1', [row('a', 'x')]); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual(expect.objectContaining({ key: 'a', value: 'x' })); + }); + + it('handles a value change as a remove + add', async () => { + await syncSearchRows(knex, 'e1', [row('a', 'old'), row('b', 'keep')]); + await syncSearchRows(knex, 'e1', [row('a', 'new'), row('b', 'keep')]); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(2); + expect(rows).toEqual( + expect.arrayContaining([ + expect.objectContaining({ key: 'a', value: 'new' }), + expect.objectContaining({ key: 'b', value: 'keep' }), + ]), + ); + }); + + it('removes all rows when syncing with an empty set', async () => { + await syncSearchRows(knex, 'e1', [row('a', 'x'), row('b', 'y')]); + await syncSearchRows(knex, 'e1', []); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(0); + }); + + it('handles null values correctly', async () => { + await syncSearchRows(knex, 'e1', [row('a', null), row('b', 'y')]); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(2); + expect(rows).toEqual( + expect.arrayContaining([ + expect.objectContaining({ key: 'a', value: null }), + expect.objectContaining({ key: 'b', value: 'y' }), + ]), + ); + + // Change null to value + await syncSearchRows(knex, 'e1', [row('a', 'v'), row('b', 'y')]); + + const rows2 = await getSearchRows(); + expect(rows2).toEqual( + expect.arrayContaining([ + expect.objectContaining({ key: 'a', value: 'v' }), + expect.objectContaining({ key: 'b', value: 'y' }), + ]), + ); + }); + + it('distinguishes rows by original_value', async () => { + await syncSearchRows(knex, 'e1', [row('a', 'v', 'V')]); + await syncSearchRows(knex, 'e1', [row('a', 'v', 'v')]); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual( + expect.objectContaining({ + key: 'a', + value: 'v', + original_value: 'v', + }), + ); + }); + + it('inserts a row when only original_value casing differs from existing', async () => { + // Two rows with the same (key, value) but different original_value + // casing must coexist — the case-insensitive MySQL collation must not + // cause the INSERT to skip the second row. + await syncSearchRows(knex, 'e1', [row('a', 'v', 'V')]); + await syncSearchRows(knex, 'e1', [row('a', 'v', 'V'), row('a', 'v', 'v')]); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(2); + expect(rows).toEqual( + expect.arrayContaining([ + expect.objectContaining({ key: 'a', value: 'v', original_value: 'V' }), + expect.objectContaining({ key: 'a', value: 'v', original_value: 'v' }), + ]), + ); + }); + + it('handles multiple rows with the same key but different values', async () => { + // Simulates array-derived rows like metadata.tags + await syncSearchRows(knex, 'e1', [ + row('metadata.tags', 'java'), + row('metadata.tags', 'python'), + row('metadata.tags', 'go'), + ]); + + // Remove one tag, add another + await syncSearchRows(knex, 'e1', [ + row('metadata.tags', 'java'), + row('metadata.tags', 'python'), + row('metadata.tags', 'rust'), + ]); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(3); + expect(rows.map(r => r.value).sort()).toEqual(['java', 'python', 'rust']); + }); + + it('simulates the typical steady-state case with one changed row', async () => { + // Build a realistic-ish set of search rows + const initial = [ + ...Array.from({ length: 50 }, (_, i) => row(`spec.field${i}`, `v${i}`)), + row('metadata.name', 'my-entity'), + row('metadata.namespace', 'default'), + row('relations.ownedby', 'group:default/team-a'), + ]; + + await syncSearchRows(knex, 'e1', initial); + expect(await getSearchRows()).toHaveLength(53); + + // Only the relation changed + const updated = [ + ...Array.from({ length: 50 }, (_, i) => row(`spec.field${i}`, `v${i}`)), + row('metadata.name', 'my-entity'), + row('metadata.namespace', 'default'), + row('relations.ownedby', 'group:default/team-b'), + ]; + + await syncSearchRows(knex, 'e1', updated); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(53); + expect(rows.find(r => r.key === 'relations.ownedby')).toEqual( + expect.objectContaining({ value: 'group:default/team-b' }), + ); + }); +}); diff --git a/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.ts b/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.ts new file mode 100644 index 0000000000..18969b4f33 --- /dev/null +++ b/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.ts @@ -0,0 +1,228 @@ +/* + * Copyright 2026 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Knex } from 'knex'; +import { DbSearchRow } from '../../tables'; +import { BATCH_SIZE } from './util'; + +// The Postgres sync uses COALESCE(x, NULL_SENTINEL) to allow Postgres to +// include nullable columns in the Hash Cond of anti-joins (IS NOT DISTINCT +// FROM prevents this). As a consequence, values that are exactly this +// sentinel character are not searchable — they would be treated as NULL. +// This is the SOH (Start of Heading) control character which does not +// appear in real entity metadata. +const NULL_SENTINEL = '\x01'; + +function filterSentinelValues(entries: DbSearchRow[]): DbSearchRow[] { + return entries.filter( + r => r.value !== NULL_SENTINEL && r.original_value !== NULL_SENTINEL, + ); +} + +/** + * Synchronizes the search table rows for a given entity, applying only the + * minimal set of changes needed. Rows that already exist with the correct + * values are left untouched, new rows are inserted, and stale rows are + * deleted — minimizing write churn, dead tuples, and WAL traffic. + * + * Uses database-specific strategies: + * - Postgres: Single writable CTE with unnest (one round-trip, no DDL) + * - MySQL: Temporary table merge (two queries in a transaction) + * - SQLite: Simple bulk replace (sufficient for dev/test) + */ +export async function syncSearchRows( + knex: Knex | Knex.Transaction, + entityId: string, + searchEntries: DbSearchRow[], +): Promise { + const client = knex.client.config.client; + + if (client === 'pg') { + await syncPostgres(knex, entityId, searchEntries); + } else if (client.includes('mysql')) { + await syncMysql(knex, entityId, searchEntries); + } else { + await syncBulkReplace(knex, entityId, searchEntries); + } +} + +// --------------------------------------------------------------------------- +// Postgres: writable CTE + unnest +// +// All CTE branches see the same pre-modification snapshot, so the DELETE +// and INSERT do not interfere with each other. This is a single atomic +// statement — no explicit transaction wrapper needed. +// +// Nullable columns use COALESCE(x, chr(1)) instead of IS NOT DISTINCT FROM +// so that Postgres can include all three columns in the Hash Cond of the +// anti-join, rather than pushing nullable comparisons into a Join Filter +// that degrades to O(n*m) when many rows share the same key. chr(1) (SOH +// control character) is used as the NULL sentinel — it cannot appear in +// real entity values since they are human-readable strings. +// --------------------------------------------------------------------------- +async function syncPostgres( + knex: Knex | Knex.Transaction, + entityId: string, + searchEntries: DbSearchRow[], +): Promise { + const filtered = filterSentinelValues(searchEntries); + const keys = filtered.map(r => r.key); + const values = filtered.map(r => r.value); + const originalValues = filtered.map(r => r.original_value); + + await knex.raw( + ` + WITH desired AS ( + SELECT * + FROM unnest(?::text[], ?::text[], ?::text[]) + AS d(key, value, original_value) + ), + deleted AS ( + DELETE FROM "search" s + WHERE s.entity_id = ? + AND NOT EXISTS ( + SELECT 1 FROM desired d + WHERE d.key = s.key + AND COALESCE(d.value, chr(1)) = COALESCE(s.value, chr(1)) + AND COALESCE(d.original_value, chr(1)) = COALESCE(s.original_value, chr(1)) + ) + ) + INSERT INTO "search" (entity_id, key, value, original_value) + SELECT ?, d.key, d.value, d.original_value + FROM desired d + WHERE NOT EXISTS ( + SELECT 1 FROM "search" s + WHERE s.entity_id = ? + AND s.key = d.key + AND COALESCE(s.value, chr(1)) = COALESCE(d.value, chr(1)) + AND COALESCE(s.original_value, chr(1)) = COALESCE(d.original_value, chr(1)) + ) + `, + [keys, values, originalValues, entityId, entityId, entityId], + ); +} + +// --------------------------------------------------------------------------- +// MySQL: temporary table merge with deadlock retry +// +// MySQL does not support data-modifying CTEs, so we materialize the desired +// state into a session-scoped temporary table and then merge it into the +// real table with two queries. The temp table is created inside the +// transaction to guarantee it exists on the same pooled connection. +// CREATE/DROP TEMPORARY TABLE does not cause an implicit commit in MySQL +// (unlike regular DDL), so this is transaction-safe. +// +// InnoDB's next-key (gap) locking can cause deadlocks between concurrent +// transactions operating on different entity_ids when their gap locks +// overlap on shared index pages. We retry on deadlock (error 1213) since +// the operation is idempotent. +// --------------------------------------------------------------------------- +const MYSQL_DEADLOCK_MAX_RETRIES = 3; + +async function syncMysql( + knex: Knex | Knex.Transaction, + entityId: string, + searchEntries: DbSearchRow[], +): Promise { + for (let attempt = 1; ; attempt++) { + try { + await knex.transaction(async trx => { + // Create the temp table inside the transaction so it's guaranteed + // to be on the same pooled connection as the merge queries. + // CREATE TEMPORARY TABLE does not cause an implicit commit in + // MySQL (unlike regular CREATE TABLE), so this is safe. + await trx.raw( + 'CREATE TEMPORARY TABLE IF NOT EXISTS `_desired_search` (' + + '`key` VARCHAR(255) NOT NULL, ' + + '`value` VARCHAR(255) NULL, ' + + '`original_value` VARCHAR(255) NULL' + + ')', + ); + // Clear stale data from any previous call on this connection. + // Uses DELETE (DML) instead of TRUNCATE (DDL) to avoid an + // implicit commit that would break transaction atomicity. + await trx.raw('DELETE FROM `_desired_search`'); + + if (searchEntries.length > 0) { + await trx.batchInsert( + '_desired_search', + searchEntries.map(r => ({ + key: r.key, + value: r.value, + original_value: r.original_value, + })), + BATCH_SIZE, + ); + } + + // Delete rows that are no longer in the desired set + await trx.raw( + 'DELETE s FROM `search` s ' + + 'WHERE s.entity_id = ? ' + + 'AND NOT EXISTS (' + + ' SELECT 1 FROM `_desired_search` d' + + ' WHERE d.`key` = s.`key`' + + ' AND d.`value` <=> s.`value`' + + ' AND BINARY d.`original_value` <=> BINARY s.`original_value`' + + ')', + [entityId], + ); + + // Insert rows that are new in the desired set. The original_value + // column preserves the original casing and must be compared with + // BINARY to avoid MySQL's default case-insensitive collation + // treating e.g. "Team-A" and "team-a" as equal. + await trx.raw( + 'INSERT INTO `search` (entity_id, `key`, `value`, `original_value`) ' + + 'SELECT ?, d.`key`, d.`value`, d.`original_value` ' + + 'FROM `_desired_search` d ' + + 'WHERE NOT EXISTS (' + + ' SELECT 1 FROM `search` s' + + ' WHERE s.entity_id = ?' + + ' AND s.`key` = d.`key`' + + ' AND s.`value` <=> d.`value`' + + ' AND BINARY s.`original_value` <=> BINARY d.`original_value`' + + ')', + [entityId, entityId], + ); + }); + return; + } catch (error) { + // MySQL error 1213: ER_LOCK_DEADLOCK + if ( + (error as any)?.errno === 1213 && + attempt < MYSQL_DEADLOCK_MAX_RETRIES + ) { + continue; + } + throw error; + } + } +} + +// --------------------------------------------------------------------------- +// SQLite (and fallback): bulk replace +// --------------------------------------------------------------------------- +async function syncBulkReplace( + knex: Knex | Knex.Transaction, + entityId: string, + searchEntries: DbSearchRow[], +): Promise { + await knex.transaction(async trx => { + await trx('search').where({ entity_id: entityId }).delete(); + await trx.batchInsert('search', searchEntries, BATCH_SIZE); + }); +}