Attempt to abort tasks when root lifecycle shutdown hook is invoked

Signed-off-by: Eric Peterson <ericpeterson@spotify.com>
This commit is contained in:
Eric Peterson
2024-10-02 14:55:50 +02:00
parent 8b49cc2be9
commit e36d12f5ba
9 changed files with 109 additions and 22 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/backend-defaults': patch
---
The task scheduler now attempts to abort any tasks if it detects that Backstage is being shut down.
@@ -3,6 +3,8 @@
> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/).
```ts
/// <reference types="node" />
import { DatabaseService } from '@backstage/backend-plugin-api';
import { LifecycleService } from '@backstage/backend-plugin-api';
import { LoggerService } from '@backstage/backend-plugin-api';
@@ -5,6 +5,7 @@
```ts
import { DatabaseService } from '@backstage/backend-plugin-api';
import { LoggerService } from '@backstage/backend-plugin-api';
import { RootLifecycleService } from '@backstage/backend-plugin-api';
import { SchedulerService } from '@backstage/backend-plugin-api';
import { ServiceFactory } from '@backstage/backend-plugin-api';
@@ -14,6 +15,7 @@ export class DefaultSchedulerService {
static create(options: {
database: DatabaseService;
logger: LoggerService;
rootLifecycle?: RootLifecycleService;
}): SchedulerService;
}
@@ -17,6 +17,7 @@
import {
DatabaseService,
LoggerService,
RootLifecycleService,
SchedulerService,
} from '@backstage/backend-plugin-api';
import { once } from 'lodash';
@@ -34,6 +35,7 @@ export class DefaultSchedulerService {
static create(options: {
database: DatabaseService;
logger: LoggerService;
rootLifecycle?: RootLifecycleService;
}): SchedulerService {
const databaseFactory = once(async () => {
const knex = await options.database.getClient();
@@ -43,17 +45,24 @@ export class DefaultSchedulerService {
}
if (process.env.NODE_ENV !== 'test') {
const abortController = new AbortController();
const janitor = new PluginTaskSchedulerJanitor({
knex,
waitBetweenRuns: Duration.fromObject({ minutes: 1 }),
logger: options.logger,
});
janitor.start();
options.rootLifecycle?.addShutdownHook(() => abortController.abort());
janitor.start(abortController.signal);
}
return knex;
});
return new PluginTaskSchedulerImpl(databaseFactory, options.logger);
return new PluginTaskSchedulerImpl(
databaseFactory,
options.logger,
options.rootLifecycle,
);
}
}
@@ -36,7 +36,7 @@ export class LocalTaskWorker {
private readonly logger: LoggerService,
) {}
start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) {
start(settings: TaskSettingsV2, options: { signal: AbortSignal }) {
this.logger.info(
`Task worker starting: ${this.taskId}, ${JSON.stringify(settings)}`,
);
@@ -48,18 +48,18 @@ export class LocalTaskWorker {
if (settings.initialDelayDuration) {
await this.sleep(
Duration.fromISO(settings.initialDelayDuration),
options?.signal,
options.signal,
);
}
while (!options?.signal?.aborted) {
while (!options.signal.aborted) {
const startTime = process.hrtime();
await this.runOnce(settings, options?.signal);
await this.runOnce(settings, options.signal);
const timeTaken = process.hrtime(startTime);
await this.waitUntilNext(
settings,
(timeTaken[0] + timeTaken[1] / 1e9) * 1000,
options?.signal,
options.signal,
);
}
@@ -89,7 +89,7 @@ export class LocalTaskWorker {
*/
private async runOnce(
settings: TaskSettingsV2,
signal?: AbortSignal,
signal: AbortSignal,
): Promise<void> {
// Abort the task execution either if the worker is stopped, or if the
// task timeout is hit
@@ -115,9 +115,9 @@ export class LocalTaskWorker {
private async waitUntilNext(
settings: TaskSettingsV2,
lastRunMillis: number,
signal?: AbortSignal,
signal: AbortSignal,
) {
if (signal?.aborted) {
if (signal.aborted) {
return;
}
@@ -145,7 +145,7 @@ export class LocalTaskWorker {
private async sleep(
duration: Duration,
abortSignal?: AbortSignal,
abortSignal: AbortSignal,
): Promise<void> {
this.abortWait = delegateAbortController(abortSignal);
await sleep(duration, this.abortWait.signal);
@@ -38,6 +38,7 @@ function defer() {
jest.setTimeout(60_000);
describe('PluginTaskManagerImpl', () => {
const addShutdownHook = jest.fn();
const databases = TestDatabases.create({
ids: ['POSTGRES_16', 'POSTGRES_12', 'SQLITE_3'],
});
@@ -51,12 +52,17 @@ describe('PluginTaskManagerImpl', () => {
jest.useFakeTimers();
}, 60_000);
beforeEach(() => {
jest.clearAllMocks();
});
async function init(databaseId: TestDatabaseId) {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const manager = new PluginTaskSchedulerImpl(
async () => knex,
mockServices.logger.mock(),
{ addShutdownHook, addStartupHook: jest.fn() },
);
return { knex, manager };
}
@@ -103,6 +109,33 @@ describe('PluginTaskManagerImpl', () => {
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
},
);
it.each(databases.eachSupportedId())(
'aborts the task if shutdown hook is invoked, %p',
async databaseId => {
const { manager } = await init(databaseId);
const fn = jest.fn();
const promise = new Promise<AbortSignal>(resolve =>
fn.mockImplementation(resolve),
);
await manager.scheduleTask({
id: 'task3',
timeout: Duration.fromMillis(5000),
frequency: { cron: '* * * * * *' },
fn,
scope: 'global',
});
const shutdownHook = addShutdownHook.mock.calls[0][0];
const abortSignal = await promise;
expect(abortSignal.aborted).toBe(false);
// Should be aborted after the shutdown hook is invoked
await shutdownHook();
expect(abortSignal.aborted).toBe(true);
},
);
});
describe('triggerTask with global scope', () => {
@@ -212,6 +245,30 @@ describe('PluginTaskManagerImpl', () => {
await promise;
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
}, 60_000);
it('aborts the task if shutdown hook is invoked', async () => {
const { manager } = await init('SQLITE_3');
const fn = jest.fn();
const promise = new Promise<AbortSignal>(resolve =>
fn.mockImplementation(resolve),
);
await manager.scheduleTask({
id: 'task3',
timeout: Duration.fromMillis(5000),
frequency: { cron: '* * * * * *' },
fn,
scope: 'local',
});
const shutdownHook = addShutdownHook.mock.calls[0][0];
const abortSignal = await promise;
expect(abortSignal.aborted).toBe(false);
// Should be aborted after the shutdown hook is invoked
await shutdownHook();
expect(abortSignal.aborted).toBe(true);
}, 60_000);
});
describe('triggerTask with local scope', () => {
@@ -16,6 +16,7 @@
import {
LoggerService,
RootLifecycleService,
SchedulerService,
SchedulerServiceTaskDescriptor,
SchedulerServiceTaskFunction,
@@ -29,7 +30,7 @@ import { Duration } from 'luxon';
import { LocalTaskWorker } from './LocalTaskWorker';
import { TaskWorker } from './TaskWorker';
import { TaskSettingsV2 } from './types';
import { TRACER_ID, validateId } from './util';
import { delegateAbortController, TRACER_ID, validateId } from './util';
const tracer = trace.getTracer(TRACER_ID);
@@ -39,6 +40,7 @@ const tracer = trace.getTracer(TRACER_ID);
export class PluginTaskSchedulerImpl implements SchedulerService {
private readonly localTasksById = new Map<string, LocalTaskWorker>();
private readonly allScheduledTasks: SchedulerServiceTaskDescriptor[] = [];
private readonly shutdownInitiated: Promise<boolean>;
private readonly counter: Counter;
private readonly duration: Histogram;
@@ -46,6 +48,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
constructor(
private readonly databaseFactory: () => Promise<Knex>,
private readonly logger: LoggerService,
rootLifecycle?: RootLifecycleService,
) {
const meter = metrics.getMeter('default');
this.counter = meter.createCounter('backend_tasks.task.runs.count', {
@@ -55,6 +58,9 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
description: 'Histogram of task run durations',
unit: 'seconds',
});
this.shutdownInitiated = new Promise(shutdownInitiated => {
rootLifecycle?.addShutdownHook(() => shutdownInitiated(true));
});
}
async triggerTask(id: string): Promise<void> {
@@ -83,6 +89,11 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
timeoutAfterDuration: parseDuration(task.timeout),
};
// Delegated abort controller that will abort either when the provided
// controller aborts, or when a root lifecycle shutdown happens
const abortController = delegateAbortController(task.signal);
this.shutdownInitiated.then(() => abortController.abort());
if (scope === 'global') {
const knex = await this.databaseFactory();
const worker = new TaskWorker(
@@ -91,14 +102,14 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
knex,
this.logger.child({ task: task.id }),
);
await worker.start(settings, { signal: task.signal });
await worker.start(settings, { signal: abortController.signal });
} else {
const worker = new LocalTaskWorker(
task.id,
this.instrumentedFunction(task, scope),
this.logger.child({ task: task.id }),
);
worker.start(settings, { signal: task.signal });
worker.start(settings, { signal: abortController.signal });
this.localTasksById.set(task.id, worker);
}
@@ -41,7 +41,7 @@ export class TaskWorker {
private readonly workCheckFrequency: Duration = DEFAULT_WORK_CHECK_FREQUENCY,
) {}
async start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) {
async start(settings: TaskSettingsV2, options: { signal: AbortSignal }) {
try {
await this.persistTask(settings);
} catch (e) {
@@ -68,18 +68,18 @@ export class TaskWorker {
if (settings.initialDelayDuration) {
await sleep(
Duration.fromISO(settings.initialDelayDuration),
options?.signal,
options.signal,
);
}
while (!options?.signal?.aborted) {
const runResult = await this.runOnce(options?.signal);
while (!options.signal.aborted) {
const runResult = await this.runOnce(options.signal);
if (runResult.result === 'abort') {
break;
}
await sleep(workCheckFrequency, options?.signal);
await sleep(workCheckFrequency, options.signal);
}
this.logger.info(`Task worker finished: ${this.taskId}`);
@@ -122,7 +122,7 @@ export class TaskWorker {
* @returns The outcome of the attempt
*/
private async runOnce(
signal?: AbortSignal,
signal: AbortSignal,
): Promise<
| { result: 'not-ready-yet' }
| { result: 'abort' }
@@ -34,8 +34,9 @@ export const schedulerServiceFactory = createServiceFactory({
deps: {
database: coreServices.database,
logger: coreServices.logger,
rootLifecycle: coreServices.rootLifecycle,
},
async factory({ database, logger }) {
return DefaultSchedulerService.create({ database, logger });
async factory({ database, logger, rootLifecycle }) {
return DefaultSchedulerService.create({ database, logger, rootLifecycle });
},
});