diff --git a/.changeset/search-indices-and-dedup.md b/.changeset/search-indices-and-dedup.md new file mode 100644 index 0000000000..58e370a319 --- /dev/null +++ b/.changeset/search-indices-and-dedup.md @@ -0,0 +1,33 @@ +--- +'@backstage/plugin-catalog-backend': patch +--- + +Added a migration that removes duplicate rows from the `search` table, creates covering indices for improved query performance, and adds a `UNIQUE` constraint on `(entity_id, key, value)`. + +This is a long-running migration on large catalogs. On PostgreSQL with millions of search rows, the index creation may take 5-15 minutes per index. During this time, other pods running the previous version will continue to serve traffic normally — the index creation does not block reads or writes. However, if a Kubernetes liveness probe kills the pod before the index build completes, the build is lost and the next startup will start over. On large tables this can repeat indefinitely. + +**For large installations**, it is recommended to run the following SQL commands against your PostgreSQL database **before deploying** this version. Each index build takes a few minutes but does not block reads or writes. If these have already completed, the migration will detect the existing indices and skip all work — startup will be instant. + +```sql +-- Step 1: Remove duplicate search rows +WITH cte AS ( + SELECT ctid, row_number() OVER (PARTITION BY entity_id, key, value) AS rn + FROM search +) +DELETE FROM search USING cte WHERE search.ctid = cte.ctid AND cte.rn > 1; + +-- Step 2: Create new indices (run each separately) +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS + search_entity_key_value_idx ON search (entity_id, key, value); +CREATE INDEX CONCURRENTLY IF NOT EXISTS + search_key_value_entity_idx ON search (key, value, entity_id); +CREATE INDEX CONCURRENTLY IF NOT EXISTS + search_facets_covering_idx ON search (key, original_value, entity_id) + WHERE original_value IS NOT NULL; + +-- Step 3: Drop old indices that are no longer needed +DROP INDEX CONCURRENTLY IF EXISTS search_key_value_idx; +DROP INDEX CONCURRENTLY IF EXISTS search_key_original_value_idx; +``` + +Also fixed `buildEntitySearch` to remove duplicate output for entities with duplicate array values, and added `ON CONFLICT DO UPDATE` to `syncSearchRows` so that concurrent stitching races are handled gracefully. diff --git a/plugins/catalog-backend/migrations/20260510000000_search_indices_and_dedup.js b/plugins/catalog-backend/migrations/20260510000000_search_indices_and_dedup.js new file mode 100644 index 0000000000..1a35b0f080 --- /dev/null +++ b/plugins/catalog-backend/migrations/20260510000000_search_indices_and_dedup.js @@ -0,0 +1,439 @@ +/* + * Copyright 2026 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// @ts-check + +/** + * Deduplicates the search table, adds covering indices (including a UNIQUE + * constraint on (entity_id, key, value)), and drops superseded indices. + * + * On PostgreSQL this uses CREATE INDEX CONCURRENTLY which avoids blocking + * reads/writes but can take several minutes on large tables (13M+ rows). + * Each step is idempotent: it checks the current state and skips work + * already done, or cleans up INVALID indices left by an interrupted + * attempt before retrying. However, an interrupted index build does NOT + * leave partial progress — each retry starts from scratch. If the + * Kubernetes liveness probe kills the pod before an index build completes, + * the next startup will drop the INVALID index and restart the build. On + * large tables this can repeat indefinitely. To prevent this, either + * increase the liveness probe timeout for this one-time upgrade, or run + * the SQL commands below manually before deploying. + * + * ## Deduplication strategy + * + * The dedup step is skipped entirely if the UNIQUE index already exists and + * is valid — a valid unique index guarantees no duplicates can be present. + * + * When dedup is needed the migration uses a two-phase approach that avoids + * a full heap scan by leveraging the pre-existing + * search_key_value_entity_idx (key, value, entity_id) covering index: + * + * Phase 1 (index-only scan): + * GROUP BY entity_id, key, value on the covering index — zero heap + * fetches — to build a small temp table of only the duplicate groups. + * + * Phase 2 (index scan, dup rows only): + * For each duplicate group in the temp table, LATERAL index-scan back + * into search to find the per-group ctids, then DELETE them in one + * statement. Only the ~2× duplicate rows are touched; clean rows are + * never read from the heap. + * + * ## Recommended: run manually before deploying (large installations) + * + * For PostgreSQL installations with millions of search rows, run these + * commands against your database BEFORE deploying this version. Each + * index build takes a few minutes but does not block reads or writes. + * The migration detects that the indices already exist and skips all work, + * so startup is instant. + * + * -- 1. Remove duplicate search rows using the same index-friendly + * -- two-phase strategy used by the migration itself. + * CREATE TEMP TABLE _search_dedup_groups AS + * SELECT entity_id, key, value + * FROM search + * GROUP BY entity_id, key, value + * HAVING COUNT(*) > 1; + * CREATE INDEX ON _search_dedup_groups (key, value, entity_id); + * + * DELETE FROM search WHERE ctid IN ( + * SELECT s.ctid FROM _search_dedup_groups g + * CROSS JOIN LATERAL ( + * SELECT ctid FROM ( + * SELECT ctid, + * row_number() OVER (ORDER BY ctid) AS rn + * FROM search + * WHERE key = g.key AND entity_id = g.entity_id + * AND value = g.value + * ) sub WHERE rn > 1 + * ) s WHERE g.value IS NOT NULL + * UNION ALL + * SELECT s.ctid FROM _search_dedup_groups g + * CROSS JOIN LATERAL ( + * SELECT ctid FROM ( + * SELECT ctid, + * row_number() OVER (ORDER BY ctid) AS rn + * FROM search + * WHERE key = g.key AND entity_id = g.entity_id + * AND value IS NULL + * ) sub WHERE rn > 1 + * ) s WHERE g.value IS NULL + * ); + * + * DROP TABLE _search_dedup_groups; + * + * -- 2. Create indices (run each separately) + * CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS + * search_entity_key_value_idx ON search (entity_id, key, value); + * CREATE INDEX CONCURRENTLY IF NOT EXISTS + * search_key_value_entity_idx ON search (key, value, entity_id); + * CREATE INDEX CONCURRENTLY IF NOT EXISTS + * search_facets_covering_idx ON search (key, original_value, entity_id) + * WHERE original_value IS NOT NULL; + * + * -- 3. Drop old indices + * DROP INDEX CONCURRENTLY IF EXISTS search_key_value_idx; + * DROP INDEX CONCURRENTLY IF EXISTS search_key_original_value_idx; + * + * If these commands have already completed, the migration will detect the + * existing indices and skip all work — startup will be instant. + */ + +/** + * @param {import('knex').Knex} knex + */ +exports.up = async function up(knex) { + const client = knex.client.config.client; + + if (client.includes('pg')) { + await upPostgres(knex); + } else if (client.includes('mysql')) { + await upMysql(knex); + } else { + await upSqlite(knex); + } +}; + +/** + * @param {import('knex').Knex} knex + */ +exports.down = async function down(knex) { + const client = knex.client.config.client; + + if (client.includes('pg')) { + // Restore the old indices first so there is no window without coverage, + // then drop the new ones. + await knex.raw( + 'CREATE INDEX CONCURRENTLY IF NOT EXISTS search_key_value_idx ON search (key, value)', + ); + await knex.raw( + 'CREATE INDEX CONCURRENTLY IF NOT EXISTS search_key_original_value_idx ON search (key, original_value)', + ); + await knex.raw( + 'DROP INDEX CONCURRENTLY IF EXISTS search_entity_key_value_idx', + ); + await knex.raw( + 'DROP INDEX CONCURRENTLY IF EXISTS search_key_value_entity_idx', + ); + await knex.raw( + 'DROP INDEX CONCURRENTLY IF EXISTS search_facets_covering_idx', + ); + } else if (client.includes('mysql')) { + await knex.schema.alterTable('search', table => { + table.index(['key', 'value'], 'search_key_value_idx'); + table.index(['key', 'original_value'], 'search_key_original_value_idx'); + }); + await mysqlDropIndexIfExists(knex, 'search_entity_key_value_idx'); + await mysqlDropIndexIfExists(knex, 'search_key_value_entity_idx'); + await mysqlDropIndexIfExists(knex, 'search_facets_covering_idx'); + } else { + await knex.raw( + 'CREATE INDEX IF NOT EXISTS search_key_value_idx ON search (key, value)', + ); + await knex.raw( + 'CREATE INDEX IF NOT EXISTS search_key_original_value_idx ON search (key, original_value)', + ); + await knex.raw('DROP INDEX IF EXISTS search_entity_key_value_idx'); + await knex.raw('DROP INDEX IF EXISTS search_key_value_entity_idx'); + await knex.raw('DROP INDEX IF EXISTS search_facets_covering_idx'); + } +}; + +exports.config = { transaction: false }; + +// --------------------------------------------------------------------------- +// PostgreSQL +// --------------------------------------------------------------------------- + +/** @param {import('knex').Knex} knex */ +async function upPostgres(knex) { + // Step 1: Ensure the covering index exists before deduplication. + // This non-unique index on (key, value, entity_id) covers all three dedup + // columns, enabling an index-only GROUP BY scan with zero heap fetches in + // Phase 1 of the dedup. Creating it here guarantees this is true even on + // vanilla installations that have never run any preparatory SQL manually. + await ensurePgIndex(knex, { + name: 'search_key_value_entity_idx', + columns: '(key, value, entity_id)', + unique: false, + }); + + // Step 2: Remove duplicate search rows. + // + // Fast path: if the UNIQUE index already exists and is valid, Postgres has + // been enforcing uniqueness since the index was created, so there are no + // duplicates. Skip dedup entirely — this makes restarts essentially free + // for installations that created the index manually beforehand. + // + // Slow path (index absent or invalid): two-phase approach. + // Phase 1: index-only GROUP BY scan over search_key_value_entity_idx + // (key, value, entity_id) — zero heap fetches — to build a + // temp table of only the duplicate groups. + // Phase 2: LATERAL + index scan to find ctids within each group, then + // a single DELETE. Only the duplicate rows (~2× their count) + // are ever read from the heap; the full table is never scanned. + const uniqueCheck = await knex.raw( + `SELECT indisvalid + FROM pg_index + WHERE indexrelid = ( + SELECT oid FROM pg_class WHERE relname = ? AND relkind = 'i' + ) AND indisunique = true`, + ['search_entity_key_value_idx'], + ); + const needsDedup = !uniqueCheck.rows[0]?.indisvalid; + + if (needsDedup) { + // Phase 1: index-only GROUP BY scan — no heap fetches. + // search_key_value_entity_idx (key, value, entity_id) covers all three + // dedup columns, so PostgreSQL resolves COUNT(*) without touching the + // heap at all (Heap Fetches: 0 in EXPLAIN). The result is a small temp + // table of only the duplicate (entity_id, key, value) groups. + await knex.raw(` + CREATE TEMP TABLE _search_dedup_groups AS + SELECT entity_id, key, value + FROM search + GROUP BY entity_id, key, value + HAVING COUNT(*) > 1 + `); + await knex.raw( + `CREATE INDEX ON _search_dedup_groups (key, value, entity_id)`, + ); + + // Phase 2: for each duplicate group, LATERAL-join back into search via + // the covering index (Nested Loop + Index Scan), row_number within that + // tiny per-group result, then DELETE rows where rn > 1. Only the ~2× + // duplicate rows are ever read from the heap; all clean rows are skipped. + // + // NULL values need a separate arm because `value = NULL` is always false + // in SQL — `value IS NULL` is required for the index condition. + await knex.raw(` + DELETE FROM search WHERE ctid IN ( + SELECT s.ctid FROM _search_dedup_groups g + CROSS JOIN LATERAL ( + SELECT ctid FROM ( + SELECT ctid, + row_number() OVER (ORDER BY ctid) AS rn + FROM search + WHERE key = g.key AND entity_id = g.entity_id + AND value = g.value + ) sub WHERE rn > 1 + ) s WHERE g.value IS NOT NULL + + UNION ALL + + SELECT s.ctid FROM _search_dedup_groups g + CROSS JOIN LATERAL ( + SELECT ctid FROM ( + SELECT ctid, + row_number() OVER (ORDER BY ctid) AS rn + FROM search + WHERE key = g.key AND entity_id = g.entity_id + AND value IS NULL + ) sub WHERE rn > 1 + ) s WHERE g.value IS NULL + ) + `); + + await knex.raw('DROP TABLE IF EXISTS _search_dedup_groups'); + } + + // Step 3: Create remaining covering indices. Each call is idempotent — + // it checks the index state and only does work if needed. + // search_key_value_entity_idx was already created in Step 1. + await ensurePgIndex(knex, { + name: 'search_entity_key_value_idx', + columns: '(entity_id, key, value)', + unique: true, + }); + await ensurePgIndex(knex, { + name: 'search_facets_covering_idx', + columns: '(key, original_value, entity_id)', + where: 'WHERE original_value IS NOT NULL', + unique: false, + }); + + // Step 4: Drop superseded indices. + await dropPgIndexIfExists(knex, 'search_key_value_idx'); + await dropPgIndexIfExists(knex, 'search_key_original_value_idx'); +} + +/** + * Creates or replaces an index on the search table, handling all edge cases: + * - Already valid with correct uniqueness: skip + * - Exists but INVALID (interrupted CREATE): drop and recreate + * - Exists but wrong uniqueness (e.g. non-unique but we need unique): drop and recreate + * - Missing: create from scratch + * + * @param {import('knex').Knex} knex + * @param {{ name: string; columns: string; unique: boolean; where?: string }} opts + */ +async function ensurePgIndex(knex, opts) { + const { name, columns, unique, where } = opts; + + const result = await knex.raw( + ` + SELECT indisunique, indisvalid + FROM pg_index + WHERE indexrelid = ( + SELECT oid FROM pg_class WHERE relname = ? AND relkind = 'i' + ) + `, + [name], + ); + + if (result.rows.length > 0) { + const { indisunique, indisvalid } = result.rows[0]; + if (indisvalid && indisunique === unique) { + return; // Already correct + } + // Wrong state — drop and recreate + await knex.raw(`DROP INDEX CONCURRENTLY IF EXISTS "${name}"`); + } + + const uniqueKw = unique ? 'UNIQUE' : ''; + const whereClause = where || ''; + await knex.raw( + `CREATE ${uniqueKw} INDEX CONCURRENTLY "${name}" ON search ${columns} ${whereClause}`, + ); +} + +/** + * @param {import('knex').Knex} knex + * @param {string} name + */ +async function dropPgIndexIfExists(knex, name) { + const result = await knex.raw( + ` + SELECT 1 FROM pg_class WHERE relname = ? AND relkind = 'i' + `, + [name], + ); + if (result.rows.length > 0) { + await knex.raw(`DROP INDEX CONCURRENTLY IF EXISTS "${name}"`); + } +} + +// --------------------------------------------------------------------------- +// MySQL +// --------------------------------------------------------------------------- + +/** @param {import('knex').Knex} knex */ +async function upMysql(knex) { + // Dedup via temp table + await knex.transaction(async trx => { + await trx.raw( + 'CREATE TEMPORARY TABLE IF NOT EXISTS `_search_keep` (' + + '`entity_id` VARCHAR(255), `key` VARCHAR(255), ' + + '`value` VARCHAR(255), `original_value` VARCHAR(255))', + ); + await trx.raw('DELETE FROM `_search_keep`'); + await trx.raw( + 'INSERT INTO `_search_keep` ' + + 'SELECT `entity_id`, `key`, `value`, MAX(`original_value`) ' + + 'FROM `search` GROUP BY `entity_id`, `key`, `value`', + ); + await trx.raw('DELETE FROM `search`'); + await trx.raw( + 'INSERT INTO `search` (`entity_id`, `key`, `value`, `original_value`) ' + + 'SELECT * FROM `_search_keep`', + ); + await trx.raw('DROP TEMPORARY TABLE `_search_keep`'); + }); + + // Drop old indices if present, then create new ones + await mysqlDropIndexIfExists(knex, 'search_key_value_idx'); + await mysqlDropIndexIfExists(knex, 'search_key_original_value_idx'); + await mysqlDropIndexIfExists(knex, 'search_entity_key_value_idx'); + await mysqlDropIndexIfExists(knex, 'search_key_value_entity_idx'); + await mysqlDropIndexIfExists(knex, 'search_facets_covering_idx'); + + await knex.schema.alterTable('search', table => { + table.unique(['entity_id', 'key', 'value'], 'search_entity_key_value_idx'); + table.index(['key', 'value', 'entity_id'], 'search_key_value_entity_idx'); + }); + // MySQL doesn't support partial indices — create a full one + await knex.schema.alterTable('search', table => { + table.index( + ['key', 'original_value', 'entity_id'], + 'search_facets_covering_idx', + ); + }); +} + +/** @param {import('knex').Knex} knex @param {string} name */ +async function mysqlDropIndexIfExists(knex, name) { + const [rows] = await knex.raw( + `SHOW INDEX FROM \`search\` WHERE Key_name = ?`, + [name], + ); + if (rows.length > 0) { + await knex.schema.alterTable('search', t => { + t.dropIndex([], name); + }); + } +} + +// --------------------------------------------------------------------------- +// SQLite +// --------------------------------------------------------------------------- + +/** @param {import('knex').Knex} knex */ +async function upSqlite(knex) { + await knex.transaction(async trx => { + await trx.raw(` + DELETE FROM search + WHERE rowid NOT IN ( + SELECT MIN(rowid) FROM search GROUP BY entity_id, key, value + ) + `); + }); + + // Drop old, create new — SQLite is fast on small tables + await knex.raw('DROP INDEX IF EXISTS search_key_value_idx'); + await knex.raw('DROP INDEX IF EXISTS search_key_original_value_idx'); + await knex.raw('DROP INDEX IF EXISTS search_entity_key_value_idx'); + await knex.raw('DROP INDEX IF EXISTS search_key_value_entity_idx'); + await knex.raw('DROP INDEX IF EXISTS search_facets_covering_idx'); + + await knex.raw( + 'CREATE UNIQUE INDEX search_entity_key_value_idx ON search (entity_id, key, value)', + ); + await knex.raw( + 'CREATE INDEX search_key_value_entity_idx ON search (key, value, entity_id)', + ); + await knex.raw( + 'CREATE INDEX search_facets_covering_idx ON search (key, original_value, entity_id)', + ); +} diff --git a/plugins/catalog-backend/report.sql.md b/plugins/catalog-backend/report.sql.md index e23b250be8..0ea80db3f4 100644 --- a/plugins/catalog-backend/report.sql.md +++ b/plugins/catalog-backend/report.sql.md @@ -127,8 +127,9 @@ ### Indices - `search_entity_id_idx` (`entity_id`) -- `search_key_original_value_idx` (`key`, `original_value`) -- `search_key_value_idx` (`key`, `value`) +- `search_entity_key_value_idx` (`entity_id`, `key`, `value`) unique +- `search_facets_covering_idx` (`key`, `original_value`, `entity_id`) +- `search_key_value_entity_idx` (`key`, `value`, `entity_id`) ## Table `stitch_queue` diff --git a/plugins/catalog-backend/src/database/operations/stitcher/buildEntitySearch.test.ts b/plugins/catalog-backend/src/database/operations/stitcher/buildEntitySearch.test.ts index 5a9e634e35..b4903d3144 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/buildEntitySearch.test.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/buildEntitySearch.test.ts @@ -255,6 +255,41 @@ describe('buildEntitySearch', () => { ]); }); + it('deduplicates rows from duplicate array values', () => { + const rows = buildEntitySearch('eid', { + apiVersion: 'a', + kind: 'b', + metadata: { name: 'n', tags: ['java', 'java', 'Java'] }, + }); + // 'java' and 'Java' both normalise to value 'java'; all three occurrences + // should collapse into a single row (first-wins for original_value). + const tagRows = rows.filter(r => r.key === 'metadata.tags'); + expect(tagRows).toHaveLength(1); + expect(tagRows[0]).toEqual( + expect.objectContaining({ value: 'java', original_value: 'java' }), + ); + // The synthetic boolean path key (metadata.tags.java) should also appear + // exactly once. + const boolRows = rows.filter(r => r.key === 'metadata.tags.java'); + expect(boolRows).toHaveLength(1); + }); + + it('treats null and empty-string values as distinct for deduplication', () => { + // An array with both null and '' for the same key must produce two rows + // since value=null and value='' are distinct in the database. + const rows = buildEntitySearch('eid', { + apiVersion: 'a', + kind: 'b', + metadata: { name: 'n' }, + spec: { foo: [null, ''] }, + }); + const fooRows = rows.filter(r => r.key === 'spec.foo'); + expect(fooRows).toHaveLength(2); + const values = fooRows.map(r => r.value); + expect(values).toContain(null); + expect(values).toContain(''); + }); + it('rejects duplicate keys', () => { expect(() => buildEntitySearch('eid', { diff --git a/plugins/catalog-backend/src/database/operations/stitcher/buildEntitySearch.ts b/plugins/catalog-backend/src/database/operations/stitcher/buildEntitySearch.ts index c8e09f3347..341420655d 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/buildEntitySearch.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/buildEntitySearch.ts @@ -17,6 +17,7 @@ import { DEFAULT_NAMESPACE, Entity } from '@backstage/catalog-model'; import { InputError } from '@backstage/errors'; import { DbSearchRow } from '../../tables'; +import { NULL_SENTINEL } from './util'; // These are excluded in the generic loop, either because they do not make sense // to index, or because they are special-case always inserted whether they are @@ -228,5 +229,16 @@ export function buildEntitySearch( ); } - return mapToRows(raw, entityId); + const rows = mapToRows(raw, entityId); + + // Deduplicate by (key, value). Duplicate array values in the entity data + // (e.g. tags: ['java', 'java']) produce identical search rows which would + // violate the unique constraint on (entity_id, key, value). + const seen = new Set(); + return rows.filter(row => { + const k = `${row.key}\0${row.value === null ? NULL_SENTINEL : row.value}`; + if (seen.has(k)) return false; + seen.add(k); + return true; + }); } diff --git a/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.test.ts b/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.test.ts index 0168464faa..5a46608e7d 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.test.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.test.ts @@ -190,20 +190,18 @@ describe.each(databases.eachSupportedId())('syncSearchRows, %p', databaseId => { ); }); - it('inserts a row when only original_value casing differs from existing', async () => { - // Two rows with the same (key, value) but different original_value - // casing must coexist — the case-insensitive MySQL collation must not - // cause the INSERT to skip the second row. + it('keeps one row when original_value casing differs for same key+value', async () => { + // Two entries with the same lowercased (key, value) but different + // original_value casing are deduplicated — the UNIQUE constraint on + // (entity_id, key, value) allows only one. The first occurrence wins, + // matching the first-wins semantics of buildEntitySearch. await syncSearchRows(knex, 'e1', [row('a', 'v', 'V')]); await syncSearchRows(knex, 'e1', [row('a', 'v', 'V'), row('a', 'v', 'v')]); const rows = await getSearchRows(); - expect(rows).toHaveLength(2); - expect(rows).toEqual( - expect.arrayContaining([ - expect.objectContaining({ key: 'a', value: 'v', original_value: 'V' }), - expect.objectContaining({ key: 'a', value: 'v', original_value: 'v' }), - ]), + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual( + expect.objectContaining({ key: 'a', value: 'v', original_value: 'V' }), ); }); @@ -227,6 +225,27 @@ describe.each(databases.eachSupportedId())('syncSearchRows, %p', databaseId => { expect(rows.map(r => r.value).sort()).toEqual(['java', 'python', 'rust']); }); + it('restores original_value when re-syncing after it was corrupted', async () => { + await syncSearchRows(knex, 'e1', [row('a', 'x', 'X')]); + + // Corrupt the stored original_value (simulates stale or wrong data left + // by a previous stitcher run) without changing the key or value. + await knex('search') + .where({ entity_id: 'e1', key: 'a', value: 'x' }) + .update({ original_value: 'corrupted' }); + + // Re-syncing the same desired rows should overwrite original_value back to + // 'X' via the ON CONFLICT DO UPDATE SET original_value = EXCLUDED.original_value + // clause inside syncSearchRows. + await syncSearchRows(knex, 'e1', [row('a', 'x', 'X')]); + + const rows = await getSearchRows(); + expect(rows).toHaveLength(1); + expect(rows[0]).toEqual( + expect.objectContaining({ key: 'a', value: 'x', original_value: 'X' }), + ); + }); + it('simulates the typical steady-state case with one changed row', async () => { // Build a realistic-ish set of search rows const initial = [ diff --git a/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.ts b/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.ts index 18969b4f33..1dde84c017 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/syncSearchRows.ts @@ -16,15 +16,7 @@ import { Knex } from 'knex'; import { DbSearchRow } from '../../tables'; -import { BATCH_SIZE } from './util'; - -// The Postgres sync uses COALESCE(x, NULL_SENTINEL) to allow Postgres to -// include nullable columns in the Hash Cond of anti-joins (IS NOT DISTINCT -// FROM prevents this). As a consequence, values that are exactly this -// sentinel character are not searchable — they would be treated as NULL. -// This is the SOH (Start of Heading) control character which does not -// appear in real entity metadata. -const NULL_SENTINEL = '\x01'; +import { BATCH_SIZE, NULL_SENTINEL } from './util'; function filterSentinelValues(entries: DbSearchRow[]): DbSearchRow[] { return entries.filter( @@ -48,14 +40,30 @@ export async function syncSearchRows( entityId: string, searchEntries: DbSearchRow[], ): Promise { + // Dedup by (key, value) — the UNIQUE constraint on (entity_id, key, value) + // rejects duplicates, and the same lowercased value with different original + // casing is semantically a single entry. Keep the first occurrence, which + // matches the first-wins semantics of buildEntitySearch so that both layers + // consistently pick the same original_value for a given input order. + const dedupMap = new Map(); + for (const entry of searchEntries) { + const k = `${entry.key}\0${ + entry.value === null ? NULL_SENTINEL : entry.value + }`; + if (!dedupMap.has(k)) { + dedupMap.set(k, entry); + } + } + const deduped = [...dedupMap.values()]; + const client = knex.client.config.client; if (client === 'pg') { - await syncPostgres(knex, entityId, searchEntries); + await syncPostgres(knex, entityId, deduped); } else if (client.includes('mysql')) { - await syncMysql(knex, entityId, searchEntries); + await syncMysql(knex, entityId, deduped); } else { - await syncBulkReplace(knex, entityId, searchEntries); + await syncBulkReplace(knex, entityId, deduped); } } @@ -110,6 +118,8 @@ async function syncPostgres( AND COALESCE(s.value, chr(1)) = COALESCE(d.value, chr(1)) AND COALESCE(s.original_value, chr(1)) = COALESCE(d.original_value, chr(1)) ) + ON CONFLICT (entity_id, key, value) + DO UPDATE SET original_value = EXCLUDED.original_value `, [keys, values, originalValues, entityId, entityId, entityId], ); diff --git a/plugins/catalog-backend/src/database/operations/stitcher/util.ts b/plugins/catalog-backend/src/database/operations/stitcher/util.ts index 4a2dabcfa6..bac099514a 100644 --- a/plugins/catalog-backend/src/database/operations/stitcher/util.ts +++ b/plugins/catalog-backend/src/database/operations/stitcher/util.ts @@ -24,6 +24,12 @@ import stableStringify from 'fast-json-stable-stringify'; // enough to get the speed benefits. export const BATCH_SIZE = 50; +// The SOH (Start of Heading) control character, used as a stand-in for NULL +// in contexts where NULL cannot participate in equality comparisons (SQL +// COALESCE, JS dedup keys). It cannot appear in real entity metadata values +// since they are human-readable strings. +export const NULL_SENTINEL = '\x01'; + export function generateStableHash(entity: Entity) { return createHash('sha1') .update(stableStringify({ ...entity })) diff --git a/plugins/catalog-backend/src/service/DefaultEntitiesCatalog.test.ts b/plugins/catalog-backend/src/service/DefaultEntitiesCatalog.test.ts index 2bc70a6e94..427a9c8ce0 100644 --- a/plugins/catalog-backend/src/service/DefaultEntitiesCatalog.test.ts +++ b/plugins/catalog-backend/src/service/DefaultEntitiesCatalog.test.ts @@ -2013,22 +2013,26 @@ describe('DefaultEntitiesCatalog', () => { await Promise.all(entities.map(e => addEntityToSearch(e))); - // Manually insert duplicate search entries for the same entities - // I'm not sure exactly how this happens but I have seen it in the real world - await knex('search').insert([ - { - entity_id: 'uid-a', - key: 'metadata.title', - value: 'a test entity', - original_value: 'A Test Entity', - }, - { - entity_id: 'uid-b', - key: 'metadata.title', - value: 'b test entity', - original_value: 'B Test Entity', - }, - ]); + // The UNIQUE constraint on (entity_id, key, value) prevents + // duplicate search rows. Verify that duplicates are silently + // rejected and the query still returns correct results. + await knex('search') + .insert([ + { + entity_id: 'uid-a', + key: 'metadata.title', + value: 'a test entity', + original_value: 'A Test Entity', + }, + { + entity_id: 'uid-b', + key: 'metadata.title', + value: 'b test entity', + original_value: 'B Test Entity', + }, + ]) + .onConflict() + .ignore(); const catalog = new DefaultEntitiesCatalog({ database: knex, @@ -2407,15 +2411,19 @@ describe('DefaultEntitiesCatalog', () => { spec: {}, }); - // Manually insert a duplicate search entry, this shouldn't happen but does in reality - await knex('search').insert([ - { - entity_id: 'uid-a', - key: 'metadata.name', - value: 'one', - original_value: 'one', - }, - ]); + // Attempt to insert a duplicate — the UNIQUE constraint silently + // rejects it via ON CONFLICT IGNORE. + await knex('search') + .insert([ + { + entity_id: 'uid-a', + key: 'metadata.name', + value: 'one', + original_value: 'one', + }, + ]) + .onConflict() + .ignore(); const catalog = new DefaultEntitiesCatalog({ database: knex, diff --git a/plugins/catalog-backend/src/tests/migrations.test.ts b/plugins/catalog-backend/src/tests/migrations.test.ts index 6e067a8126..4b1153f3c6 100644 --- a/plugins/catalog-backend/src/tests/migrations.test.ts +++ b/plugins/catalog-backend/src/tests/migrations.test.ts @@ -1094,6 +1094,164 @@ describe('migrations', () => { }, ); + it.each(databases.eachSupportedId())( + '20260510000000_search_indices_and_dedup.js, %p', + async databaseId => { + const knex = await databases.init(databaseId); + + await migrateUntilBefore( + knex, + '20260510000000_search_indices_and_dedup.js', + ); + + // Set up FK dependencies: search → final_entities → refresh_state + await knex('refresh_state').insert({ + entity_id: 'e1', + entity_ref: 'k:ns/n1', + unprocessed_entity: '{}', + errors: '[]', + next_update_at: knex.fn.now(), + last_discovery_at: knex.fn.now(), + }); + await knex('final_entities').insert({ + entity_id: 'e1', + entity_ref: 'k:ns/n1', + hash: 'h1', + final_entity: '{}', + }); + + // Insert search rows with duplicates on (entity_id, key, value). + // Both copies of k1 share the same original_value so the surviving row + // is unambiguous across all database dedup strategies. + // The two null-value rows verify that null dedup does not conflate null + // with non-null or remove more rows than it should. + await knex('search').insert([ + { entity_id: 'e1', key: 'k1', value: 'v1', original_value: 'v1' }, // first copy + { entity_id: 'e1', key: 'k1', value: 'v1', original_value: 'v1' }, // duplicate — removed + { entity_id: 'e1', key: 'k2', value: null, original_value: null }, // null first — kept + { entity_id: 'e1', key: 'k2', value: null, original_value: null }, // null duplicate — removed + { entity_id: 'e1', key: 'k3', value: 'v3', original_value: 'v3' }, // unique — kept + ]); + + // Preconditions NOT met: dedup runs + await migrateUpOnce(knex); + + // 5 rows collapsed to 3 unique (entity_id, key, value) combinations + const rows = await knex('search').orderBy('key'); + expect(rows).toHaveLength(3); + expect(rows.map(r => r.key)).toEqual(['k1', 'k2', 'k3']); + expect(rows[0]).toEqual( + expect.objectContaining({ + key: 'k1', + value: 'v1', + original_value: 'v1', + }), + ); + // null-value row survives correctly + expect(rows[1]).toEqual( + expect.objectContaining({ + key: 'k2', + value: null, + original_value: null, + }), + ); + + // Unique constraint is now in place + await expect( + knex('search').insert({ + entity_id: 'e1', + key: 'k1', + value: 'v1', + original_value: 'extra', + }), + ).rejects.toEqual(expect.anything()); + + await migrateDownOnce(knex); + + // After rollback the unique constraint is gone — duplicates are allowed again + await expect( + knex('search').insert({ + entity_id: 'e1', + key: 'k1', + value: 'v1', + original_value: 'extra', + }), + ).resolves.not.toThrow(); + + await knex.destroy(); + }, + ); + + it.each(databases.eachSupportedId())( + '20260510000000_search_indices_and_dedup.js preconditions met (PG fast path), %p', + async databaseId => { + const knex = await databases.init(databaseId); + + // The fast path (skip dedup when unique index pre-exists) is a + // PostgreSQL-only code path that checks pg_index. + if (!knex.client.config.client.includes('pg')) { + await knex.destroy(); + return; + } + + await migrateUntilBefore( + knex, + '20260510000000_search_indices_and_dedup.js', + ); + + await knex('refresh_state').insert({ + entity_id: 'e1', + entity_ref: 'k:ns/n1', + unprocessed_entity: '{}', + errors: '[]', + next_update_at: knex.fn.now(), + last_discovery_at: knex.fn.now(), + }); + await knex('final_entities').insert({ + entity_id: 'e1', + entity_ref: 'k:ns/n1', + hash: 'h1', + final_entity: '{}', + }); + + // Insert clean rows (no duplicates) so the unique index can be created + await knex('search').insert([ + { entity_id: 'e1', key: 'k1', value: 'v1', original_value: 'V1' }, + { entity_id: 'e1', key: 'k2', value: null, original_value: null }, + ]); + + // Simulate a user who manually deduped and created the unique index + // before deploying this version + await knex.raw( + `CREATE UNIQUE INDEX search_entity_key_value_idx ON search (entity_id, key, value)`, + ); + + // Migration detects the existing unique index and skips the dedup step + await migrateUpOnce(knex); + + // Row count unchanged — no rows were removed by dedup + const rows = await knex('search').orderBy('key'); + expect(rows).toHaveLength(2); + expect(rows[0]).toEqual( + expect.objectContaining({ + key: 'k1', + value: 'v1', + original_value: 'V1', + }), + ); + expect(rows[1]).toEqual( + expect.objectContaining({ + key: 'k2', + value: null, + original_value: null, + }), + ); + + await migrateDownOnce(knex); + await knex.destroy(); + }, + ); + it.each(databases.eachSupportedId())( '20260403000000_add_location_entity_ref.js, %p', async databaseId => {