diff --git a/.changeset/crash-loop-yeah.md b/.changeset/crash-loop-yeah.md new file mode 100644 index 0000000000..098bc01bae --- /dev/null +++ b/.changeset/crash-loop-yeah.md @@ -0,0 +1,5 @@ +--- +'@backstage/backend-defaults': patch +--- + +The task scheduler now attempts to abort any tasks if it detects that Backstage is being shut down. diff --git a/packages/backend-defaults/report-database.api.md b/packages/backend-defaults/report-database.api.md index 9c07338264..05c5ab251c 100644 --- a/packages/backend-defaults/report-database.api.md +++ b/packages/backend-defaults/report-database.api.md @@ -3,6 +3,8 @@ > Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/). ```ts +/// + import { DatabaseService } from '@backstage/backend-plugin-api'; import { LifecycleService } from '@backstage/backend-plugin-api'; import { LoggerService } from '@backstage/backend-plugin-api'; diff --git a/packages/backend-defaults/report-scheduler.api.md b/packages/backend-defaults/report-scheduler.api.md index 351873ad31..7d886e539c 100644 --- a/packages/backend-defaults/report-scheduler.api.md +++ b/packages/backend-defaults/report-scheduler.api.md @@ -5,6 +5,7 @@ ```ts import { DatabaseService } from '@backstage/backend-plugin-api'; import { LoggerService } from '@backstage/backend-plugin-api'; +import { RootLifecycleService } from '@backstage/backend-plugin-api'; import { SchedulerService } from '@backstage/backend-plugin-api'; import { ServiceFactory } from '@backstage/backend-plugin-api'; @@ -14,6 +15,7 @@ export class DefaultSchedulerService { static create(options: { database: DatabaseService; logger: LoggerService; + rootLifecycle?: RootLifecycleService; }): SchedulerService; } diff --git a/packages/backend-defaults/src/entrypoints/scheduler/lib/DefaultSchedulerService.ts b/packages/backend-defaults/src/entrypoints/scheduler/lib/DefaultSchedulerService.ts index 8e2ac5cd07..dcaf673753 100644 --- a/packages/backend-defaults/src/entrypoints/scheduler/lib/DefaultSchedulerService.ts +++ b/packages/backend-defaults/src/entrypoints/scheduler/lib/DefaultSchedulerService.ts @@ -17,6 +17,7 @@ import { DatabaseService, LoggerService, + RootLifecycleService, SchedulerService, } from '@backstage/backend-plugin-api'; import { once } from 'lodash'; @@ -34,6 +35,7 @@ export class DefaultSchedulerService { static create(options: { database: DatabaseService; logger: LoggerService; + rootLifecycle?: RootLifecycleService; }): SchedulerService { const databaseFactory = once(async () => { const knex = await options.database.getClient(); @@ -43,17 +45,24 @@ export class DefaultSchedulerService { } if (process.env.NODE_ENV !== 'test') { + const abortController = new AbortController(); const janitor = new PluginTaskSchedulerJanitor({ knex, waitBetweenRuns: Duration.fromObject({ minutes: 1 }), logger: options.logger, }); - janitor.start(); + + options.rootLifecycle?.addShutdownHook(() => abortController.abort()); + janitor.start(abortController.signal); } return knex; }); - return new PluginTaskSchedulerImpl(databaseFactory, options.logger); + return new PluginTaskSchedulerImpl( + databaseFactory, + options.logger, + options.rootLifecycle, + ); } } diff --git a/packages/backend-defaults/src/entrypoints/scheduler/lib/LocalTaskWorker.ts b/packages/backend-defaults/src/entrypoints/scheduler/lib/LocalTaskWorker.ts index 3510a855e9..278afae224 100644 --- a/packages/backend-defaults/src/entrypoints/scheduler/lib/LocalTaskWorker.ts +++ b/packages/backend-defaults/src/entrypoints/scheduler/lib/LocalTaskWorker.ts @@ -36,7 +36,7 @@ export class LocalTaskWorker { private readonly logger: LoggerService, ) {} - start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) { + start(settings: TaskSettingsV2, options: { signal: AbortSignal }) { this.logger.info( `Task worker starting: ${this.taskId}, ${JSON.stringify(settings)}`, ); @@ -48,18 +48,18 @@ export class LocalTaskWorker { if (settings.initialDelayDuration) { await this.sleep( Duration.fromISO(settings.initialDelayDuration), - options?.signal, + options.signal, ); } - while (!options?.signal?.aborted) { + while (!options.signal.aborted) { const startTime = process.hrtime(); - await this.runOnce(settings, options?.signal); + await this.runOnce(settings, options.signal); const timeTaken = process.hrtime(startTime); await this.waitUntilNext( settings, (timeTaken[0] + timeTaken[1] / 1e9) * 1000, - options?.signal, + options.signal, ); } @@ -89,7 +89,7 @@ export class LocalTaskWorker { */ private async runOnce( settings: TaskSettingsV2, - signal?: AbortSignal, + signal: AbortSignal, ): Promise { // Abort the task execution either if the worker is stopped, or if the // task timeout is hit @@ -115,9 +115,9 @@ export class LocalTaskWorker { private async waitUntilNext( settings: TaskSettingsV2, lastRunMillis: number, - signal?: AbortSignal, + signal: AbortSignal, ) { - if (signal?.aborted) { + if (signal.aborted) { return; } @@ -145,7 +145,7 @@ export class LocalTaskWorker { private async sleep( duration: Duration, - abortSignal?: AbortSignal, + abortSignal: AbortSignal, ): Promise { this.abortWait = delegateAbortController(abortSignal); await sleep(duration, this.abortWait.signal); diff --git a/packages/backend-defaults/src/entrypoints/scheduler/lib/PluginTaskSchedulerImpl.test.ts b/packages/backend-defaults/src/entrypoints/scheduler/lib/PluginTaskSchedulerImpl.test.ts index 3e0782d4a3..3cb41a0b48 100644 --- a/packages/backend-defaults/src/entrypoints/scheduler/lib/PluginTaskSchedulerImpl.test.ts +++ b/packages/backend-defaults/src/entrypoints/scheduler/lib/PluginTaskSchedulerImpl.test.ts @@ -38,6 +38,7 @@ function defer() { jest.setTimeout(60_000); describe('PluginTaskManagerImpl', () => { + const addShutdownHook = jest.fn(); const databases = TestDatabases.create({ ids: ['POSTGRES_16', 'POSTGRES_12', 'SQLITE_3'], }); @@ -51,12 +52,17 @@ describe('PluginTaskManagerImpl', () => { jest.useFakeTimers(); }, 60_000); + beforeEach(() => { + jest.clearAllMocks(); + }); + async function init(databaseId: TestDatabaseId) { const knex = await databases.init(databaseId); await migrateBackendTasks(knex); const manager = new PluginTaskSchedulerImpl( async () => knex, mockServices.logger.mock(), + { addShutdownHook, addStartupHook: jest.fn() }, ); return { knex, manager }; } @@ -103,6 +109,33 @@ describe('PluginTaskManagerImpl', () => { expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal)); }, ); + + it.each(databases.eachSupportedId())( + 'aborts the task if shutdown hook is invoked, %p', + async databaseId => { + const { manager } = await init(databaseId); + + const fn = jest.fn(); + const promise = new Promise(resolve => + fn.mockImplementation(resolve), + ); + await manager.scheduleTask({ + id: 'task3', + timeout: Duration.fromMillis(5000), + frequency: { cron: '* * * * * *' }, + fn, + scope: 'global', + }); + + const shutdownHook = addShutdownHook.mock.calls[0][0]; + const abortSignal = await promise; + expect(abortSignal.aborted).toBe(false); + + // Should be aborted after the shutdown hook is invoked + await shutdownHook(); + expect(abortSignal.aborted).toBe(true); + }, + ); }); describe('triggerTask with global scope', () => { @@ -212,6 +245,30 @@ describe('PluginTaskManagerImpl', () => { await promise; expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal)); }, 60_000); + + it('aborts the task if shutdown hook is invoked', async () => { + const { manager } = await init('SQLITE_3'); + + const fn = jest.fn(); + const promise = new Promise(resolve => + fn.mockImplementation(resolve), + ); + await manager.scheduleTask({ + id: 'task3', + timeout: Duration.fromMillis(5000), + frequency: { cron: '* * * * * *' }, + fn, + scope: 'local', + }); + + const shutdownHook = addShutdownHook.mock.calls[0][0]; + const abortSignal = await promise; + expect(abortSignal.aborted).toBe(false); + + // Should be aborted after the shutdown hook is invoked + await shutdownHook(); + expect(abortSignal.aborted).toBe(true); + }, 60_000); }); describe('triggerTask with local scope', () => { diff --git a/packages/backend-defaults/src/entrypoints/scheduler/lib/PluginTaskSchedulerImpl.ts b/packages/backend-defaults/src/entrypoints/scheduler/lib/PluginTaskSchedulerImpl.ts index 5c3af24ab1..5c04521149 100644 --- a/packages/backend-defaults/src/entrypoints/scheduler/lib/PluginTaskSchedulerImpl.ts +++ b/packages/backend-defaults/src/entrypoints/scheduler/lib/PluginTaskSchedulerImpl.ts @@ -16,6 +16,7 @@ import { LoggerService, + RootLifecycleService, SchedulerService, SchedulerServiceTaskDescriptor, SchedulerServiceTaskFunction, @@ -29,7 +30,7 @@ import { Duration } from 'luxon'; import { LocalTaskWorker } from './LocalTaskWorker'; import { TaskWorker } from './TaskWorker'; import { TaskSettingsV2 } from './types'; -import { TRACER_ID, validateId } from './util'; +import { delegateAbortController, TRACER_ID, validateId } from './util'; const tracer = trace.getTracer(TRACER_ID); @@ -39,6 +40,7 @@ const tracer = trace.getTracer(TRACER_ID); export class PluginTaskSchedulerImpl implements SchedulerService { private readonly localTasksById = new Map(); private readonly allScheduledTasks: SchedulerServiceTaskDescriptor[] = []; + private readonly shutdownInitiated: Promise; private readonly counter: Counter; private readonly duration: Histogram; @@ -46,6 +48,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService { constructor( private readonly databaseFactory: () => Promise, private readonly logger: LoggerService, + rootLifecycle?: RootLifecycleService, ) { const meter = metrics.getMeter('default'); this.counter = meter.createCounter('backend_tasks.task.runs.count', { @@ -55,6 +58,9 @@ export class PluginTaskSchedulerImpl implements SchedulerService { description: 'Histogram of task run durations', unit: 'seconds', }); + this.shutdownInitiated = new Promise(shutdownInitiated => { + rootLifecycle?.addShutdownHook(() => shutdownInitiated(true)); + }); } async triggerTask(id: string): Promise { @@ -83,6 +89,11 @@ export class PluginTaskSchedulerImpl implements SchedulerService { timeoutAfterDuration: parseDuration(task.timeout), }; + // Delegated abort controller that will abort either when the provided + // controller aborts, or when a root lifecycle shutdown happens + const abortController = delegateAbortController(task.signal); + this.shutdownInitiated.then(() => abortController.abort()); + if (scope === 'global') { const knex = await this.databaseFactory(); const worker = new TaskWorker( @@ -91,14 +102,14 @@ export class PluginTaskSchedulerImpl implements SchedulerService { knex, this.logger.child({ task: task.id }), ); - await worker.start(settings, { signal: task.signal }); + await worker.start(settings, { signal: abortController.signal }); } else { const worker = new LocalTaskWorker( task.id, this.instrumentedFunction(task, scope), this.logger.child({ task: task.id }), ); - worker.start(settings, { signal: task.signal }); + worker.start(settings, { signal: abortController.signal }); this.localTasksById.set(task.id, worker); } diff --git a/packages/backend-defaults/src/entrypoints/scheduler/lib/TaskWorker.ts b/packages/backend-defaults/src/entrypoints/scheduler/lib/TaskWorker.ts index c5570a8e30..48bc81efbf 100644 --- a/packages/backend-defaults/src/entrypoints/scheduler/lib/TaskWorker.ts +++ b/packages/backend-defaults/src/entrypoints/scheduler/lib/TaskWorker.ts @@ -41,7 +41,7 @@ export class TaskWorker { private readonly workCheckFrequency: Duration = DEFAULT_WORK_CHECK_FREQUENCY, ) {} - async start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) { + async start(settings: TaskSettingsV2, options: { signal: AbortSignal }) { try { await this.persistTask(settings); } catch (e) { @@ -68,18 +68,18 @@ export class TaskWorker { if (settings.initialDelayDuration) { await sleep( Duration.fromISO(settings.initialDelayDuration), - options?.signal, + options.signal, ); } - while (!options?.signal?.aborted) { - const runResult = await this.runOnce(options?.signal); + while (!options.signal.aborted) { + const runResult = await this.runOnce(options.signal); if (runResult.result === 'abort') { break; } - await sleep(workCheckFrequency, options?.signal); + await sleep(workCheckFrequency, options.signal); } this.logger.info(`Task worker finished: ${this.taskId}`); @@ -122,7 +122,7 @@ export class TaskWorker { * @returns The outcome of the attempt */ private async runOnce( - signal?: AbortSignal, + signal: AbortSignal, ): Promise< | { result: 'not-ready-yet' } | { result: 'abort' } diff --git a/packages/backend-defaults/src/entrypoints/scheduler/schedulerServiceFactory.ts b/packages/backend-defaults/src/entrypoints/scheduler/schedulerServiceFactory.ts index effa5c5925..3668dbd17a 100644 --- a/packages/backend-defaults/src/entrypoints/scheduler/schedulerServiceFactory.ts +++ b/packages/backend-defaults/src/entrypoints/scheduler/schedulerServiceFactory.ts @@ -34,8 +34,9 @@ export const schedulerServiceFactory = createServiceFactory({ deps: { database: coreServices.database, logger: coreServices.logger, + rootLifecycle: coreServices.rootLifecycle, }, - async factory({ database, logger }) { - return DefaultSchedulerService.create({ database, logger }); + async factory({ database, logger, rootLifecycle }) { + return DefaultSchedulerService.create({ database, logger, rootLifecycle }); }, });