Merge pull request #34177 from backstage/freben/search-indices-migration

feat(catalog-backend): search table dedup, covering indices, and UNIQUE constraint
This commit is contained in:
Fredrik Adelöw
2026-05-12 14:19:53 +02:00
committed by GitHub
10 changed files with 771 additions and 50 deletions
+33
View File
@@ -0,0 +1,33 @@
---
'@backstage/plugin-catalog-backend': patch
---
Added a migration that removes duplicate rows from the `search` table, creates covering indices for improved query performance, and adds a `UNIQUE` constraint on `(entity_id, key, value)`.
This is a long-running migration on large catalogs. On PostgreSQL with millions of search rows, the index creation may take 5-15 minutes per index. During this time, other pods running the previous version will continue to serve traffic normally — the index creation does not block reads or writes. However, if a Kubernetes liveness probe kills the pod before the index build completes, the build is lost and the next startup will start over. On large tables this can repeat indefinitely.
**For large installations**, it is recommended to run the following SQL commands against your PostgreSQL database **before deploying** this version. Each index build takes a few minutes but does not block reads or writes. If these have already completed, the migration will detect the existing indices and skip all work — startup will be instant.
```sql
-- Step 1: Remove duplicate search rows
WITH cte AS (
SELECT ctid, row_number() OVER (PARTITION BY entity_id, key, value) AS rn
FROM search
)
DELETE FROM search USING cte WHERE search.ctid = cte.ctid AND cte.rn > 1;
-- Step 2: Create new indices (run each separately)
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS
search_entity_key_value_idx ON search (entity_id, key, value);
CREATE INDEX CONCURRENTLY IF NOT EXISTS
search_key_value_entity_idx ON search (key, value, entity_id);
CREATE INDEX CONCURRENTLY IF NOT EXISTS
search_facets_covering_idx ON search (key, original_value, entity_id)
WHERE original_value IS NOT NULL;
-- Step 3: Drop old indices that are no longer needed
DROP INDEX CONCURRENTLY IF EXISTS search_key_value_idx;
DROP INDEX CONCURRENTLY IF EXISTS search_key_original_value_idx;
```
Also fixed `buildEntitySearch` to remove duplicate output for entities with duplicate array values, and added `ON CONFLICT DO UPDATE` to `syncSearchRows` so that concurrent stitching races are handled gracefully.
@@ -0,0 +1,439 @@
/*
* Copyright 2026 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @ts-check
/**
* Deduplicates the search table, adds covering indices (including a UNIQUE
* constraint on (entity_id, key, value)), and drops superseded indices.
*
* On PostgreSQL this uses CREATE INDEX CONCURRENTLY which avoids blocking
* reads/writes but can take several minutes on large tables (13M+ rows).
* Each step is idempotent: it checks the current state and skips work
* already done, or cleans up INVALID indices left by an interrupted
* attempt before retrying. However, an interrupted index build does NOT
* leave partial progress — each retry starts from scratch. If the
* Kubernetes liveness probe kills the pod before an index build completes,
* the next startup will drop the INVALID index and restart the build. On
* large tables this can repeat indefinitely. To prevent this, either
* increase the liveness probe timeout for this one-time upgrade, or run
* the SQL commands below manually before deploying.
*
* ## Deduplication strategy
*
* The dedup step is skipped entirely if the UNIQUE index already exists and
* is valid — a valid unique index guarantees no duplicates can be present.
*
* When dedup is needed the migration uses a two-phase approach that avoids
* a full heap scan by leveraging the pre-existing
* search_key_value_entity_idx (key, value, entity_id) covering index:
*
* Phase 1 (index-only scan):
* GROUP BY entity_id, key, value on the covering index — zero heap
* fetches — to build a small temp table of only the duplicate groups.
*
* Phase 2 (index scan, dup rows only):
* For each duplicate group in the temp table, LATERAL index-scan back
* into search to find the per-group ctids, then DELETE them in one
* statement. Only the ~2× duplicate rows are touched; clean rows are
* never read from the heap.
*
* ## Recommended: run manually before deploying (large installations)
*
* For PostgreSQL installations with millions of search rows, run these
* commands against your database BEFORE deploying this version. Each
* index build takes a few minutes but does not block reads or writes.
* The migration detects that the indices already exist and skips all work,
* so startup is instant.
*
* -- 1. Remove duplicate search rows using the same index-friendly
* -- two-phase strategy used by the migration itself.
* CREATE TEMP TABLE _search_dedup_groups AS
* SELECT entity_id, key, value
* FROM search
* GROUP BY entity_id, key, value
* HAVING COUNT(*) > 1;
* CREATE INDEX ON _search_dedup_groups (key, value, entity_id);
*
* DELETE FROM search WHERE ctid IN (
* SELECT s.ctid FROM _search_dedup_groups g
* CROSS JOIN LATERAL (
* SELECT ctid FROM (
* SELECT ctid,
* row_number() OVER (ORDER BY ctid) AS rn
* FROM search
* WHERE key = g.key AND entity_id = g.entity_id
* AND value = g.value
* ) sub WHERE rn > 1
* ) s WHERE g.value IS NOT NULL
* UNION ALL
* SELECT s.ctid FROM _search_dedup_groups g
* CROSS JOIN LATERAL (
* SELECT ctid FROM (
* SELECT ctid,
* row_number() OVER (ORDER BY ctid) AS rn
* FROM search
* WHERE key = g.key AND entity_id = g.entity_id
* AND value IS NULL
* ) sub WHERE rn > 1
* ) s WHERE g.value IS NULL
* );
*
* DROP TABLE _search_dedup_groups;
*
* -- 2. Create indices (run each separately)
* CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS
* search_entity_key_value_idx ON search (entity_id, key, value);
* CREATE INDEX CONCURRENTLY IF NOT EXISTS
* search_key_value_entity_idx ON search (key, value, entity_id);
* CREATE INDEX CONCURRENTLY IF NOT EXISTS
* search_facets_covering_idx ON search (key, original_value, entity_id)
* WHERE original_value IS NOT NULL;
*
* -- 3. Drop old indices
* DROP INDEX CONCURRENTLY IF EXISTS search_key_value_idx;
* DROP INDEX CONCURRENTLY IF EXISTS search_key_original_value_idx;
*
* If these commands have already completed, the migration will detect the
* existing indices and skip all work — startup will be instant.
*/
/**
* @param {import('knex').Knex} knex
*/
exports.up = async function up(knex) {
const client = knex.client.config.client;
if (client.includes('pg')) {
await upPostgres(knex);
} else if (client.includes('mysql')) {
await upMysql(knex);
} else {
await upSqlite(knex);
}
};
/**
* @param {import('knex').Knex} knex
*/
exports.down = async function down(knex) {
const client = knex.client.config.client;
if (client.includes('pg')) {
// Restore the old indices first so there is no window without coverage,
// then drop the new ones.
await knex.raw(
'CREATE INDEX CONCURRENTLY IF NOT EXISTS search_key_value_idx ON search (key, value)',
);
await knex.raw(
'CREATE INDEX CONCURRENTLY IF NOT EXISTS search_key_original_value_idx ON search (key, original_value)',
);
await knex.raw(
'DROP INDEX CONCURRENTLY IF EXISTS search_entity_key_value_idx',
);
await knex.raw(
'DROP INDEX CONCURRENTLY IF EXISTS search_key_value_entity_idx',
);
await knex.raw(
'DROP INDEX CONCURRENTLY IF EXISTS search_facets_covering_idx',
);
} else if (client.includes('mysql')) {
await knex.schema.alterTable('search', table => {
table.index(['key', 'value'], 'search_key_value_idx');
table.index(['key', 'original_value'], 'search_key_original_value_idx');
});
await mysqlDropIndexIfExists(knex, 'search_entity_key_value_idx');
await mysqlDropIndexIfExists(knex, 'search_key_value_entity_idx');
await mysqlDropIndexIfExists(knex, 'search_facets_covering_idx');
} else {
await knex.raw(
'CREATE INDEX IF NOT EXISTS search_key_value_idx ON search (key, value)',
);
await knex.raw(
'CREATE INDEX IF NOT EXISTS search_key_original_value_idx ON search (key, original_value)',
);
await knex.raw('DROP INDEX IF EXISTS search_entity_key_value_idx');
await knex.raw('DROP INDEX IF EXISTS search_key_value_entity_idx');
await knex.raw('DROP INDEX IF EXISTS search_facets_covering_idx');
}
};
exports.config = { transaction: false };
// ---------------------------------------------------------------------------
// PostgreSQL
// ---------------------------------------------------------------------------
/** @param {import('knex').Knex} knex */
async function upPostgres(knex) {
// Step 1: Ensure the covering index exists before deduplication.
// This non-unique index on (key, value, entity_id) covers all three dedup
// columns, enabling an index-only GROUP BY scan with zero heap fetches in
// Phase 1 of the dedup. Creating it here guarantees this is true even on
// vanilla installations that have never run any preparatory SQL manually.
await ensurePgIndex(knex, {
name: 'search_key_value_entity_idx',
columns: '(key, value, entity_id)',
unique: false,
});
// Step 2: Remove duplicate search rows.
//
// Fast path: if the UNIQUE index already exists and is valid, Postgres has
// been enforcing uniqueness since the index was created, so there are no
// duplicates. Skip dedup entirely — this makes restarts essentially free
// for installations that created the index manually beforehand.
//
// Slow path (index absent or invalid): two-phase approach.
// Phase 1: index-only GROUP BY scan over search_key_value_entity_idx
// (key, value, entity_id) — zero heap fetches — to build a
// temp table of only the duplicate groups.
// Phase 2: LATERAL + index scan to find ctids within each group, then
// a single DELETE. Only the duplicate rows (~2× their count)
// are ever read from the heap; the full table is never scanned.
const uniqueCheck = await knex.raw(
`SELECT indisvalid
FROM pg_index
WHERE indexrelid = (
SELECT oid FROM pg_class WHERE relname = ? AND relkind = 'i'
) AND indisunique = true`,
['search_entity_key_value_idx'],
);
const needsDedup = !uniqueCheck.rows[0]?.indisvalid;
if (needsDedup) {
// Phase 1: index-only GROUP BY scan — no heap fetches.
// search_key_value_entity_idx (key, value, entity_id) covers all three
// dedup columns, so PostgreSQL resolves COUNT(*) without touching the
// heap at all (Heap Fetches: 0 in EXPLAIN). The result is a small temp
// table of only the duplicate (entity_id, key, value) groups.
await knex.raw(`
CREATE TEMP TABLE _search_dedup_groups AS
SELECT entity_id, key, value
FROM search
GROUP BY entity_id, key, value
HAVING COUNT(*) > 1
`);
await knex.raw(
`CREATE INDEX ON _search_dedup_groups (key, value, entity_id)`,
);
// Phase 2: for each duplicate group, LATERAL-join back into search via
// the covering index (Nested Loop + Index Scan), row_number within that
// tiny per-group result, then DELETE rows where rn > 1. Only the ~2×
// duplicate rows are ever read from the heap; all clean rows are skipped.
//
// NULL values need a separate arm because `value = NULL` is always false
// in SQL — `value IS NULL` is required for the index condition.
await knex.raw(`
DELETE FROM search WHERE ctid IN (
SELECT s.ctid FROM _search_dedup_groups g
CROSS JOIN LATERAL (
SELECT ctid FROM (
SELECT ctid,
row_number() OVER (ORDER BY ctid) AS rn
FROM search
WHERE key = g.key AND entity_id = g.entity_id
AND value = g.value
) sub WHERE rn > 1
) s WHERE g.value IS NOT NULL
UNION ALL
SELECT s.ctid FROM _search_dedup_groups g
CROSS JOIN LATERAL (
SELECT ctid FROM (
SELECT ctid,
row_number() OVER (ORDER BY ctid) AS rn
FROM search
WHERE key = g.key AND entity_id = g.entity_id
AND value IS NULL
) sub WHERE rn > 1
) s WHERE g.value IS NULL
)
`);
await knex.raw('DROP TABLE IF EXISTS _search_dedup_groups');
}
// Step 3: Create remaining covering indices. Each call is idempotent —
// it checks the index state and only does work if needed.
// search_key_value_entity_idx was already created in Step 1.
await ensurePgIndex(knex, {
name: 'search_entity_key_value_idx',
columns: '(entity_id, key, value)',
unique: true,
});
await ensurePgIndex(knex, {
name: 'search_facets_covering_idx',
columns: '(key, original_value, entity_id)',
where: 'WHERE original_value IS NOT NULL',
unique: false,
});
// Step 4: Drop superseded indices.
await dropPgIndexIfExists(knex, 'search_key_value_idx');
await dropPgIndexIfExists(knex, 'search_key_original_value_idx');
}
/**
* Creates or replaces an index on the search table, handling all edge cases:
* - Already valid with correct uniqueness: skip
* - Exists but INVALID (interrupted CREATE): drop and recreate
* - Exists but wrong uniqueness (e.g. non-unique but we need unique): drop and recreate
* - Missing: create from scratch
*
* @param {import('knex').Knex} knex
* @param {{ name: string; columns: string; unique: boolean; where?: string }} opts
*/
async function ensurePgIndex(knex, opts) {
const { name, columns, unique, where } = opts;
const result = await knex.raw(
`
SELECT indisunique, indisvalid
FROM pg_index
WHERE indexrelid = (
SELECT oid FROM pg_class WHERE relname = ? AND relkind = 'i'
)
`,
[name],
);
if (result.rows.length > 0) {
const { indisunique, indisvalid } = result.rows[0];
if (indisvalid && indisunique === unique) {
return; // Already correct
}
// Wrong state — drop and recreate
await knex.raw(`DROP INDEX CONCURRENTLY IF EXISTS "${name}"`);
}
const uniqueKw = unique ? 'UNIQUE' : '';
const whereClause = where || '';
await knex.raw(
`CREATE ${uniqueKw} INDEX CONCURRENTLY "${name}" ON search ${columns} ${whereClause}`,
);
}
/**
* @param {import('knex').Knex} knex
* @param {string} name
*/
async function dropPgIndexIfExists(knex, name) {
const result = await knex.raw(
`
SELECT 1 FROM pg_class WHERE relname = ? AND relkind = 'i'
`,
[name],
);
if (result.rows.length > 0) {
await knex.raw(`DROP INDEX CONCURRENTLY IF EXISTS "${name}"`);
}
}
// ---------------------------------------------------------------------------
// MySQL
// ---------------------------------------------------------------------------
/** @param {import('knex').Knex} knex */
async function upMysql(knex) {
// Dedup via temp table
await knex.transaction(async trx => {
await trx.raw(
'CREATE TEMPORARY TABLE IF NOT EXISTS `_search_keep` (' +
'`entity_id` VARCHAR(255), `key` VARCHAR(255), ' +
'`value` VARCHAR(255), `original_value` VARCHAR(255))',
);
await trx.raw('DELETE FROM `_search_keep`');
await trx.raw(
'INSERT INTO `_search_keep` ' +
'SELECT `entity_id`, `key`, `value`, MAX(`original_value`) ' +
'FROM `search` GROUP BY `entity_id`, `key`, `value`',
);
await trx.raw('DELETE FROM `search`');
await trx.raw(
'INSERT INTO `search` (`entity_id`, `key`, `value`, `original_value`) ' +
'SELECT * FROM `_search_keep`',
);
await trx.raw('DROP TEMPORARY TABLE `_search_keep`');
});
// Drop old indices if present, then create new ones
await mysqlDropIndexIfExists(knex, 'search_key_value_idx');
await mysqlDropIndexIfExists(knex, 'search_key_original_value_idx');
await mysqlDropIndexIfExists(knex, 'search_entity_key_value_idx');
await mysqlDropIndexIfExists(knex, 'search_key_value_entity_idx');
await mysqlDropIndexIfExists(knex, 'search_facets_covering_idx');
await knex.schema.alterTable('search', table => {
table.unique(['entity_id', 'key', 'value'], 'search_entity_key_value_idx');
table.index(['key', 'value', 'entity_id'], 'search_key_value_entity_idx');
});
// MySQL doesn't support partial indices — create a full one
await knex.schema.alterTable('search', table => {
table.index(
['key', 'original_value', 'entity_id'],
'search_facets_covering_idx',
);
});
}
/** @param {import('knex').Knex} knex @param {string} name */
async function mysqlDropIndexIfExists(knex, name) {
const [rows] = await knex.raw(
`SHOW INDEX FROM \`search\` WHERE Key_name = ?`,
[name],
);
if (rows.length > 0) {
await knex.schema.alterTable('search', t => {
t.dropIndex([], name);
});
}
}
// ---------------------------------------------------------------------------
// SQLite
// ---------------------------------------------------------------------------
/** @param {import('knex').Knex} knex */
async function upSqlite(knex) {
await knex.transaction(async trx => {
await trx.raw(`
DELETE FROM search
WHERE rowid NOT IN (
SELECT MIN(rowid) FROM search GROUP BY entity_id, key, value
)
`);
});
// Drop old, create new — SQLite is fast on small tables
await knex.raw('DROP INDEX IF EXISTS search_key_value_idx');
await knex.raw('DROP INDEX IF EXISTS search_key_original_value_idx');
await knex.raw('DROP INDEX IF EXISTS search_entity_key_value_idx');
await knex.raw('DROP INDEX IF EXISTS search_key_value_entity_idx');
await knex.raw('DROP INDEX IF EXISTS search_facets_covering_idx');
await knex.raw(
'CREATE UNIQUE INDEX search_entity_key_value_idx ON search (entity_id, key, value)',
);
await knex.raw(
'CREATE INDEX search_key_value_entity_idx ON search (key, value, entity_id)',
);
await knex.raw(
'CREATE INDEX search_facets_covering_idx ON search (key, original_value, entity_id)',
);
}
+3 -2
View File
@@ -127,8 +127,9 @@
### Indices
- `search_entity_id_idx` (`entity_id`)
- `search_key_original_value_idx` (`key`, `original_value`)
- `search_key_value_idx` (`key`, `value`)
- `search_entity_key_value_idx` (`entity_id`, `key`, `value`) unique
- `search_facets_covering_idx` (`key`, `original_value`, `entity_id`)
- `search_key_value_entity_idx` (`key`, `value`, `entity_id`)
## Table `stitch_queue`
@@ -255,6 +255,41 @@ describe('buildEntitySearch', () => {
]);
});
it('deduplicates rows from duplicate array values', () => {
const rows = buildEntitySearch('eid', {
apiVersion: 'a',
kind: 'b',
metadata: { name: 'n', tags: ['java', 'java', 'Java'] },
});
// 'java' and 'Java' both normalise to value 'java'; all three occurrences
// should collapse into a single row (first-wins for original_value).
const tagRows = rows.filter(r => r.key === 'metadata.tags');
expect(tagRows).toHaveLength(1);
expect(tagRows[0]).toEqual(
expect.objectContaining({ value: 'java', original_value: 'java' }),
);
// The synthetic boolean path key (metadata.tags.java) should also appear
// exactly once.
const boolRows = rows.filter(r => r.key === 'metadata.tags.java');
expect(boolRows).toHaveLength(1);
});
it('treats null and empty-string values as distinct for deduplication', () => {
// An array with both null and '' for the same key must produce two rows
// since value=null and value='' are distinct in the database.
const rows = buildEntitySearch('eid', {
apiVersion: 'a',
kind: 'b',
metadata: { name: 'n' },
spec: { foo: [null, ''] },
});
const fooRows = rows.filter(r => r.key === 'spec.foo');
expect(fooRows).toHaveLength(2);
const values = fooRows.map(r => r.value);
expect(values).toContain(null);
expect(values).toContain('');
});
it('rejects duplicate keys', () => {
expect(() =>
buildEntitySearch('eid', {
@@ -17,6 +17,7 @@
import { DEFAULT_NAMESPACE, Entity } from '@backstage/catalog-model';
import { InputError } from '@backstage/errors';
import { DbSearchRow } from '../../tables';
import { NULL_SENTINEL } from './util';
// These are excluded in the generic loop, either because they do not make sense
// to index, or because they are special-case always inserted whether they are
@@ -228,5 +229,16 @@ export function buildEntitySearch(
);
}
return mapToRows(raw, entityId);
const rows = mapToRows(raw, entityId);
// Deduplicate by (key, value). Duplicate array values in the entity data
// (e.g. tags: ['java', 'java']) produce identical search rows which would
// violate the unique constraint on (entity_id, key, value).
const seen = new Set<string>();
return rows.filter(row => {
const k = `${row.key}\0${row.value === null ? NULL_SENTINEL : row.value}`;
if (seen.has(k)) return false;
seen.add(k);
return true;
});
}
@@ -190,20 +190,18 @@ describe.each(databases.eachSupportedId())('syncSearchRows, %p', databaseId => {
);
});
it('inserts a row when only original_value casing differs from existing', async () => {
// Two rows with the same (key, value) but different original_value
// casing must coexist — the case-insensitive MySQL collation must not
// cause the INSERT to skip the second row.
it('keeps one row when original_value casing differs for same key+value', async () => {
// Two entries with the same lowercased (key, value) but different
// original_value casing are deduplicated — the UNIQUE constraint on
// (entity_id, key, value) allows only one. The first occurrence wins,
// matching the first-wins semantics of buildEntitySearch.
await syncSearchRows(knex, 'e1', [row('a', 'v', 'V')]);
await syncSearchRows(knex, 'e1', [row('a', 'v', 'V'), row('a', 'v', 'v')]);
const rows = await getSearchRows();
expect(rows).toHaveLength(2);
expect(rows).toEqual(
expect.arrayContaining([
expect.objectContaining({ key: 'a', value: 'v', original_value: 'V' }),
expect.objectContaining({ key: 'a', value: 'v', original_value: 'v' }),
]),
expect(rows).toHaveLength(1);
expect(rows[0]).toEqual(
expect.objectContaining({ key: 'a', value: 'v', original_value: 'V' }),
);
});
@@ -227,6 +225,27 @@ describe.each(databases.eachSupportedId())('syncSearchRows, %p', databaseId => {
expect(rows.map(r => r.value).sort()).toEqual(['java', 'python', 'rust']);
});
it('restores original_value when re-syncing after it was corrupted', async () => {
await syncSearchRows(knex, 'e1', [row('a', 'x', 'X')]);
// Corrupt the stored original_value (simulates stale or wrong data left
// by a previous stitcher run) without changing the key or value.
await knex('search')
.where({ entity_id: 'e1', key: 'a', value: 'x' })
.update({ original_value: 'corrupted' });
// Re-syncing the same desired rows should overwrite original_value back to
// 'X' via the ON CONFLICT DO UPDATE SET original_value = EXCLUDED.original_value
// clause inside syncSearchRows.
await syncSearchRows(knex, 'e1', [row('a', 'x', 'X')]);
const rows = await getSearchRows();
expect(rows).toHaveLength(1);
expect(rows[0]).toEqual(
expect.objectContaining({ key: 'a', value: 'x', original_value: 'X' }),
);
});
it('simulates the typical steady-state case with one changed row', async () => {
// Build a realistic-ish set of search rows
const initial = [
@@ -16,15 +16,7 @@
import { Knex } from 'knex';
import { DbSearchRow } from '../../tables';
import { BATCH_SIZE } from './util';
// The Postgres sync uses COALESCE(x, NULL_SENTINEL) to allow Postgres to
// include nullable columns in the Hash Cond of anti-joins (IS NOT DISTINCT
// FROM prevents this). As a consequence, values that are exactly this
// sentinel character are not searchable — they would be treated as NULL.
// This is the SOH (Start of Heading) control character which does not
// appear in real entity metadata.
const NULL_SENTINEL = '\x01';
import { BATCH_SIZE, NULL_SENTINEL } from './util';
function filterSentinelValues(entries: DbSearchRow[]): DbSearchRow[] {
return entries.filter(
@@ -48,14 +40,30 @@ export async function syncSearchRows(
entityId: string,
searchEntries: DbSearchRow[],
): Promise<void> {
// Dedup by (key, value) — the UNIQUE constraint on (entity_id, key, value)
// rejects duplicates, and the same lowercased value with different original
// casing is semantically a single entry. Keep the first occurrence, which
// matches the first-wins semantics of buildEntitySearch so that both layers
// consistently pick the same original_value for a given input order.
const dedupMap = new Map<string, DbSearchRow>();
for (const entry of searchEntries) {
const k = `${entry.key}\0${
entry.value === null ? NULL_SENTINEL : entry.value
}`;
if (!dedupMap.has(k)) {
dedupMap.set(k, entry);
}
}
const deduped = [...dedupMap.values()];
const client = knex.client.config.client;
if (client === 'pg') {
await syncPostgres(knex, entityId, searchEntries);
await syncPostgres(knex, entityId, deduped);
} else if (client.includes('mysql')) {
await syncMysql(knex, entityId, searchEntries);
await syncMysql(knex, entityId, deduped);
} else {
await syncBulkReplace(knex, entityId, searchEntries);
await syncBulkReplace(knex, entityId, deduped);
}
}
@@ -110,6 +118,8 @@ async function syncPostgres(
AND COALESCE(s.value, chr(1)) = COALESCE(d.value, chr(1))
AND COALESCE(s.original_value, chr(1)) = COALESCE(d.original_value, chr(1))
)
ON CONFLICT (entity_id, key, value)
DO UPDATE SET original_value = EXCLUDED.original_value
`,
[keys, values, originalValues, entityId, entityId, entityId],
);
@@ -24,6 +24,12 @@ import stableStringify from 'fast-json-stable-stringify';
// enough to get the speed benefits.
export const BATCH_SIZE = 50;
// The SOH (Start of Heading) control character, used as a stand-in for NULL
// in contexts where NULL cannot participate in equality comparisons (SQL
// COALESCE, JS dedup keys). It cannot appear in real entity metadata values
// since they are human-readable strings.
export const NULL_SENTINEL = '\x01';
export function generateStableHash(entity: Entity) {
return createHash('sha1')
.update(stableStringify({ ...entity }))
@@ -2013,22 +2013,26 @@ describe('DefaultEntitiesCatalog', () => {
await Promise.all(entities.map(e => addEntityToSearch(e)));
// Manually insert duplicate search entries for the same entities
// I'm not sure exactly how this happens but I have seen it in the real world
await knex<DbSearchRow>('search').insert([
{
entity_id: 'uid-a',
key: 'metadata.title',
value: 'a test entity',
original_value: 'A Test Entity',
},
{
entity_id: 'uid-b',
key: 'metadata.title',
value: 'b test entity',
original_value: 'B Test Entity',
},
]);
// The UNIQUE constraint on (entity_id, key, value) prevents
// duplicate search rows. Verify that duplicates are silently
// rejected and the query still returns correct results.
await knex<DbSearchRow>('search')
.insert([
{
entity_id: 'uid-a',
key: 'metadata.title',
value: 'a test entity',
original_value: 'A Test Entity',
},
{
entity_id: 'uid-b',
key: 'metadata.title',
value: 'b test entity',
original_value: 'B Test Entity',
},
])
.onConflict()
.ignore();
const catalog = new DefaultEntitiesCatalog({
database: knex,
@@ -2407,15 +2411,19 @@ describe('DefaultEntitiesCatalog', () => {
spec: {},
});
// Manually insert a duplicate search entry, this shouldn't happen but does in reality
await knex<DbSearchRow>('search').insert([
{
entity_id: 'uid-a',
key: 'metadata.name',
value: 'one',
original_value: 'one',
},
]);
// Attempt to insert a duplicate — the UNIQUE constraint silently
// rejects it via ON CONFLICT IGNORE.
await knex<DbSearchRow>('search')
.insert([
{
entity_id: 'uid-a',
key: 'metadata.name',
value: 'one',
original_value: 'one',
},
])
.onConflict()
.ignore();
const catalog = new DefaultEntitiesCatalog({
database: knex,
@@ -1094,6 +1094,164 @@ describe('migrations', () => {
},
);
it.each(databases.eachSupportedId())(
'20260510000000_search_indices_and_dedup.js, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateUntilBefore(
knex,
'20260510000000_search_indices_and_dedup.js',
);
// Set up FK dependencies: search → final_entities → refresh_state
await knex('refresh_state').insert({
entity_id: 'e1',
entity_ref: 'k:ns/n1',
unprocessed_entity: '{}',
errors: '[]',
next_update_at: knex.fn.now(),
last_discovery_at: knex.fn.now(),
});
await knex('final_entities').insert({
entity_id: 'e1',
entity_ref: 'k:ns/n1',
hash: 'h1',
final_entity: '{}',
});
// Insert search rows with duplicates on (entity_id, key, value).
// Both copies of k1 share the same original_value so the surviving row
// is unambiguous across all database dedup strategies.
// The two null-value rows verify that null dedup does not conflate null
// with non-null or remove more rows than it should.
await knex('search').insert([
{ entity_id: 'e1', key: 'k1', value: 'v1', original_value: 'v1' }, // first copy
{ entity_id: 'e1', key: 'k1', value: 'v1', original_value: 'v1' }, // duplicate — removed
{ entity_id: 'e1', key: 'k2', value: null, original_value: null }, // null first — kept
{ entity_id: 'e1', key: 'k2', value: null, original_value: null }, // null duplicate — removed
{ entity_id: 'e1', key: 'k3', value: 'v3', original_value: 'v3' }, // unique — kept
]);
// Preconditions NOT met: dedup runs
await migrateUpOnce(knex);
// 5 rows collapsed to 3 unique (entity_id, key, value) combinations
const rows = await knex('search').orderBy('key');
expect(rows).toHaveLength(3);
expect(rows.map(r => r.key)).toEqual(['k1', 'k2', 'k3']);
expect(rows[0]).toEqual(
expect.objectContaining({
key: 'k1',
value: 'v1',
original_value: 'v1',
}),
);
// null-value row survives correctly
expect(rows[1]).toEqual(
expect.objectContaining({
key: 'k2',
value: null,
original_value: null,
}),
);
// Unique constraint is now in place
await expect(
knex('search').insert({
entity_id: 'e1',
key: 'k1',
value: 'v1',
original_value: 'extra',
}),
).rejects.toEqual(expect.anything());
await migrateDownOnce(knex);
// After rollback the unique constraint is gone — duplicates are allowed again
await expect(
knex('search').insert({
entity_id: 'e1',
key: 'k1',
value: 'v1',
original_value: 'extra',
}),
).resolves.not.toThrow();
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'20260510000000_search_indices_and_dedup.js preconditions met (PG fast path), %p',
async databaseId => {
const knex = await databases.init(databaseId);
// The fast path (skip dedup when unique index pre-exists) is a
// PostgreSQL-only code path that checks pg_index.
if (!knex.client.config.client.includes('pg')) {
await knex.destroy();
return;
}
await migrateUntilBefore(
knex,
'20260510000000_search_indices_and_dedup.js',
);
await knex('refresh_state').insert({
entity_id: 'e1',
entity_ref: 'k:ns/n1',
unprocessed_entity: '{}',
errors: '[]',
next_update_at: knex.fn.now(),
last_discovery_at: knex.fn.now(),
});
await knex('final_entities').insert({
entity_id: 'e1',
entity_ref: 'k:ns/n1',
hash: 'h1',
final_entity: '{}',
});
// Insert clean rows (no duplicates) so the unique index can be created
await knex('search').insert([
{ entity_id: 'e1', key: 'k1', value: 'v1', original_value: 'V1' },
{ entity_id: 'e1', key: 'k2', value: null, original_value: null },
]);
// Simulate a user who manually deduped and created the unique index
// before deploying this version
await knex.raw(
`CREATE UNIQUE INDEX search_entity_key_value_idx ON search (entity_id, key, value)`,
);
// Migration detects the existing unique index and skips the dedup step
await migrateUpOnce(knex);
// Row count unchanged — no rows were removed by dedup
const rows = await knex('search').orderBy('key');
expect(rows).toHaveLength(2);
expect(rows[0]).toEqual(
expect.objectContaining({
key: 'k1',
value: 'v1',
original_value: 'V1',
}),
);
expect(rows[1]).toEqual(
expect.objectContaining({
key: 'k2',
value: null,
original_value: null,
}),
);
await migrateDownOnce(knex);
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'20260403000000_add_location_entity_ref.js, %p',
async databaseId => {