From e2dd0952d16cd861669d18cc7d2885477017946f Mon Sep 17 00:00:00 2001 From: Chris Wyatt Cook Date: Mon, 23 Jun 2025 17:18:02 -0700 Subject: [PATCH] incremental burstLength check Signed-off-by: Chris Wyatt Cook --- .changeset/fancy-nails-call.md | 5 + .../engine/IncrementalIngestionEngine.test.ts | 173 ++++++++++++++++++ .../src/engine/IncrementalIngestionEngine.ts | 12 ++ .../src/types.ts | 1 + 4 files changed, 191 insertions(+) create mode 100644 .changeset/fancy-nails-call.md create mode 100644 plugins/catalog-backend-module-incremental-ingestion/src/engine/IncrementalIngestionEngine.test.ts diff --git a/.changeset/fancy-nails-call.md b/.changeset/fancy-nails-call.md new file mode 100644 index 0000000000..97980029af --- /dev/null +++ b/.changeset/fancy-nails-call.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-catalog-backend-module-incremental-ingestion': patch +--- + +Fixed bug in IncrementalIngestionEngine by adding burstLength check when a burst completes diff --git a/plugins/catalog-backend-module-incremental-ingestion/src/engine/IncrementalIngestionEngine.test.ts b/plugins/catalog-backend-module-incremental-ingestion/src/engine/IncrementalIngestionEngine.test.ts new file mode 100644 index 0000000000..e6c7d67fd0 --- /dev/null +++ b/plugins/catalog-backend-module-incremental-ingestion/src/engine/IncrementalIngestionEngine.test.ts @@ -0,0 +1,173 @@ +/* + * Copyright 2025 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { IncrementalIngestionEngine } from './IncrementalIngestionEngine'; +import { IterationEngineOptions } from '../types'; +import { performance } from 'perf_hooks'; + +jest.setTimeout(60_000); + +describe('IncrementalIngestionEngine - Burst Length', () => { + const createMockProvider = () => ({ + getProviderName: jest.fn().mockReturnValue('test-provider'), + next: jest.fn(), + around: jest.fn(), + }); + + const createMockManager = () => + ({ + getLastMark: jest.fn().mockResolvedValue(null), + createMark: jest.fn().mockResolvedValue(undefined), + createMarkEntities: jest.fn().mockResolvedValue(undefined), + computeRemoved: jest.fn().mockResolvedValue({ total: 0, removed: [] }), + } as any); + + const createMockConnection = () => + ({ + applyMutation: jest.fn().mockResolvedValue(undefined), + refresh: jest.fn().mockResolvedValue(undefined), + } as any); + + const createMockLogger = () => + ({ + info: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + child: jest.fn().mockReturnThis(), + } as any); + + it('should respect burst length and stop burst when time limit exceeded', async () => { + const mockProvider = createMockProvider(); + const mockManager = createMockManager(); + const mockConnection = createMockConnection(); + const mockLogger = createMockLogger(); + + const options: IterationEngineOptions = { + provider: mockProvider, + manager: mockManager, + connection: mockConnection, + burstLength: { milliseconds: 100 }, // Short burst length for testing + restLength: { minutes: 1 }, + logger: mockLogger, + ready: Promise.resolve(), + }; + + const engine = new IncrementalIngestionEngine(options); + + let callCount = 0; + mockProvider.around.mockImplementation(async fn => { + await fn({}); + }); + + // Mock provider.next to return multiple batches that never complete + // Each call takes some time to simulate real processing + mockProvider.next.mockImplementation(async () => { + callCount++; + // Add a small delay to ensure we exceed burst length + await new Promise(resolve => setTimeout(resolve, 30)); + return { + done: false, // Never done - would continue forever without burst length + entities: [ + { + entity: { + kind: 'Component', + metadata: { name: `test-component-${callCount}` }, + }, + }, + ], + cursor: `cursor-${callCount}`, + }; + }); + + const signal = new AbortController().signal; + const start = performance.now(); + + const result = await engine.ingestOneBurst('test-ingestion', signal); + + const duration = performance.now() - start; + + // Verify that the burst was stopped due to time limit, not completion + expect(result).toBe(false); // Should return false since provider never returned done + expect(duration).toBeGreaterThanOrEqual(100); // Should have run for at least the burst length + expect(mockProvider.next).toHaveBeenCalledTimes(callCount); + expect(callCount).toBeGreaterThan(1); // Should have made multiple calls before stopping + + // Verify the correct log message was called + expect(mockLogger.info).toHaveBeenCalledWith( + expect.stringContaining('burst exceeded length of 100 milliseconds'), + ); + }); + + it('should complete burst normally when provider returns done before burst length', async () => { + const mockProvider = createMockProvider(); + const mockManager = createMockManager(); + const mockConnection = createMockConnection(); + const mockLogger = createMockLogger(); + + const options: IterationEngineOptions = { + provider: mockProvider, + manager: mockManager, + connection: mockConnection, + burstLength: { seconds: 10 }, // Long burst length + restLength: { minutes: 1 }, + logger: mockLogger, + ready: Promise.resolve(), + }; + + const engine = new IncrementalIngestionEngine(options); + + mockProvider.around.mockImplementation(async fn => { + await fn({}); + }); + + // Mock provider.next to return done after first call + mockProvider.next.mockResolvedValueOnce({ + done: true, + entities: [ + { + entity: { + kind: 'Component', + metadata: { name: 'test-component-1' }, + }, + }, + ], + cursor: 'final-cursor', + }); + + const signal = new AbortController().signal; + const result = await engine.ingestOneBurst('test-ingestion', signal); + + // Should return true when provider indicates done + expect(result).toBe(true); + expect(mockProvider.next).toHaveBeenCalledTimes(1); + + // Should NOT log the burst exceeded message + expect(mockLogger.info).not.toHaveBeenCalledWith( + expect.stringContaining('burst exceeded length of'), + ); + + // Should log burst initiation and completion + expect(mockLogger.info).toHaveBeenCalledWith( + "incremental-engine: Ingestion 'test-ingestion' burst initiated", + ); + expect(mockLogger.info).toHaveBeenCalledWith( + expect.stringContaining( + "incremental-engine: Ingestion 'test-ingestion' burst complete", + ), + ); + }); +}); diff --git a/plugins/catalog-backend-module-incremental-ingestion/src/engine/IncrementalIngestionEngine.ts b/plugins/catalog-backend-module-incremental-ingestion/src/engine/IncrementalIngestionEngine.ts index e005d29292..aeccaccf8d 100644 --- a/plugins/catalog-backend-module-incremental-ingestion/src/engine/IncrementalIngestionEngine.ts +++ b/plugins/catalog-backend-module-incremental-ingestion/src/engine/IncrementalIngestionEngine.ts @@ -27,6 +27,7 @@ import { HumanDuration } from '@backstage/types'; export class IncrementalIngestionEngine implements IterationEngine { private readonly restLength: Duration; + private readonly burstLength: HumanDuration; private readonly backoff: HumanDuration[]; private readonly lastStarted: Gauge; private readonly lastCompleted: Gauge; @@ -38,6 +39,7 @@ export class IncrementalIngestionEngine implements IterationEngine { this.manager = options.manager; this.restLength = Duration.fromObject(options.restLength); + this.burstLength = options.burstLength; this.backoff = options.backoff ?? [ { minutes: 1 }, { minutes: 5 }, @@ -247,6 +249,16 @@ export class IncrementalIngestionEngine implements IterationEngine { }); if (signal.aborted || next.done) { break; + } else if ( + performance.now() - start > + Duration.fromObject(this.burstLength).as('milliseconds') + ) { + this.options.logger.info( + `incremental-engine: Ingestion '${id}' burst exceeded length of ${Duration.fromObject( + this.burstLength, + ).toHuman()}.`, + ); + break; } else { next = await this.options.provider.next(context, next.cursor); count++; diff --git a/plugins/catalog-backend-module-incremental-ingestion/src/types.ts b/plugins/catalog-backend-module-incremental-ingestion/src/types.ts index 75aecf8fa5..04a73bd364 100644 --- a/plugins/catalog-backend-module-incremental-ingestion/src/types.ts +++ b/plugins/catalog-backend-module-incremental-ingestion/src/types.ts @@ -188,6 +188,7 @@ export interface IterationEngineOptions { manager: IncrementalIngestionDatabaseManager; provider: IncrementalEntityProvider; restLength: HumanDuration; + burstLength: HumanDuration; ready: Promise; backoff?: IncrementalEntityProviderOptions['backoff']; rejectRemovalsAbovePercentage?: number;