Attempt to abort tasks when root lifecycle shutdown hook is invoked
Signed-off-by: Eric Peterson <ericpeterson@spotify.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/backend-defaults': patch
|
||||
---
|
||||
|
||||
The task scheduler now attempts to abort any tasks if it detects that Backstage is being shut down.
|
||||
@@ -3,6 +3,8 @@
|
||||
> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/).
|
||||
|
||||
```ts
|
||||
/// <reference types="node" />
|
||||
|
||||
import { DatabaseService } from '@backstage/backend-plugin-api';
|
||||
import { LifecycleService } from '@backstage/backend-plugin-api';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
```ts
|
||||
import { DatabaseService } from '@backstage/backend-plugin-api';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import { RootLifecycleService } from '@backstage/backend-plugin-api';
|
||||
import { SchedulerService } from '@backstage/backend-plugin-api';
|
||||
import { ServiceFactory } from '@backstage/backend-plugin-api';
|
||||
|
||||
@@ -14,6 +15,7 @@ export class DefaultSchedulerService {
|
||||
static create(options: {
|
||||
database: DatabaseService;
|
||||
logger: LoggerService;
|
||||
rootLifecycle?: RootLifecycleService;
|
||||
}): SchedulerService;
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
import {
|
||||
DatabaseService,
|
||||
LoggerService,
|
||||
RootLifecycleService,
|
||||
SchedulerService,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { once } from 'lodash';
|
||||
@@ -34,6 +35,7 @@ export class DefaultSchedulerService {
|
||||
static create(options: {
|
||||
database: DatabaseService;
|
||||
logger: LoggerService;
|
||||
rootLifecycle?: RootLifecycleService;
|
||||
}): SchedulerService {
|
||||
const databaseFactory = once(async () => {
|
||||
const knex = await options.database.getClient();
|
||||
@@ -43,17 +45,24 @@ export class DefaultSchedulerService {
|
||||
}
|
||||
|
||||
if (process.env.NODE_ENV !== 'test') {
|
||||
const abortController = new AbortController();
|
||||
const janitor = new PluginTaskSchedulerJanitor({
|
||||
knex,
|
||||
waitBetweenRuns: Duration.fromObject({ minutes: 1 }),
|
||||
logger: options.logger,
|
||||
});
|
||||
janitor.start();
|
||||
|
||||
options.rootLifecycle?.addShutdownHook(() => abortController.abort());
|
||||
janitor.start(abortController.signal);
|
||||
}
|
||||
|
||||
return knex;
|
||||
});
|
||||
|
||||
return new PluginTaskSchedulerImpl(databaseFactory, options.logger);
|
||||
return new PluginTaskSchedulerImpl(
|
||||
databaseFactory,
|
||||
options.logger,
|
||||
options.rootLifecycle,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ export class LocalTaskWorker {
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) {
|
||||
start(settings: TaskSettingsV2, options: { signal: AbortSignal }) {
|
||||
this.logger.info(
|
||||
`Task worker starting: ${this.taskId}, ${JSON.stringify(settings)}`,
|
||||
);
|
||||
@@ -48,18 +48,18 @@ export class LocalTaskWorker {
|
||||
if (settings.initialDelayDuration) {
|
||||
await this.sleep(
|
||||
Duration.fromISO(settings.initialDelayDuration),
|
||||
options?.signal,
|
||||
options.signal,
|
||||
);
|
||||
}
|
||||
|
||||
while (!options?.signal?.aborted) {
|
||||
while (!options.signal.aborted) {
|
||||
const startTime = process.hrtime();
|
||||
await this.runOnce(settings, options?.signal);
|
||||
await this.runOnce(settings, options.signal);
|
||||
const timeTaken = process.hrtime(startTime);
|
||||
await this.waitUntilNext(
|
||||
settings,
|
||||
(timeTaken[0] + timeTaken[1] / 1e9) * 1000,
|
||||
options?.signal,
|
||||
options.signal,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ export class LocalTaskWorker {
|
||||
*/
|
||||
private async runOnce(
|
||||
settings: TaskSettingsV2,
|
||||
signal?: AbortSignal,
|
||||
signal: AbortSignal,
|
||||
): Promise<void> {
|
||||
// Abort the task execution either if the worker is stopped, or if the
|
||||
// task timeout is hit
|
||||
@@ -115,9 +115,9 @@ export class LocalTaskWorker {
|
||||
private async waitUntilNext(
|
||||
settings: TaskSettingsV2,
|
||||
lastRunMillis: number,
|
||||
signal?: AbortSignal,
|
||||
signal: AbortSignal,
|
||||
) {
|
||||
if (signal?.aborted) {
|
||||
if (signal.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -145,7 +145,7 @@ export class LocalTaskWorker {
|
||||
|
||||
private async sleep(
|
||||
duration: Duration,
|
||||
abortSignal?: AbortSignal,
|
||||
abortSignal: AbortSignal,
|
||||
): Promise<void> {
|
||||
this.abortWait = delegateAbortController(abortSignal);
|
||||
await sleep(duration, this.abortWait.signal);
|
||||
|
||||
+57
@@ -38,6 +38,7 @@ function defer() {
|
||||
jest.setTimeout(60_000);
|
||||
|
||||
describe('PluginTaskManagerImpl', () => {
|
||||
const addShutdownHook = jest.fn();
|
||||
const databases = TestDatabases.create({
|
||||
ids: ['POSTGRES_16', 'POSTGRES_12', 'SQLITE_3'],
|
||||
});
|
||||
@@ -51,12 +52,17 @@ describe('PluginTaskManagerImpl', () => {
|
||||
jest.useFakeTimers();
|
||||
}, 60_000);
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
async function init(databaseId: TestDatabaseId) {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
const manager = new PluginTaskSchedulerImpl(
|
||||
async () => knex,
|
||||
mockServices.logger.mock(),
|
||||
{ addShutdownHook, addStartupHook: jest.fn() },
|
||||
);
|
||||
return { knex, manager };
|
||||
}
|
||||
@@ -103,6 +109,33 @@ describe('PluginTaskManagerImpl', () => {
|
||||
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'aborts the task if shutdown hook is invoked, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise<AbortSignal>(resolve =>
|
||||
fn.mockImplementation(resolve),
|
||||
);
|
||||
await manager.scheduleTask({
|
||||
id: 'task3',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: { cron: '* * * * * *' },
|
||||
fn,
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
const shutdownHook = addShutdownHook.mock.calls[0][0];
|
||||
const abortSignal = await promise;
|
||||
expect(abortSignal.aborted).toBe(false);
|
||||
|
||||
// Should be aborted after the shutdown hook is invoked
|
||||
await shutdownHook();
|
||||
expect(abortSignal.aborted).toBe(true);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe('triggerTask with global scope', () => {
|
||||
@@ -212,6 +245,30 @@ describe('PluginTaskManagerImpl', () => {
|
||||
await promise;
|
||||
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
|
||||
}, 60_000);
|
||||
|
||||
it('aborts the task if shutdown hook is invoked', async () => {
|
||||
const { manager } = await init('SQLITE_3');
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise<AbortSignal>(resolve =>
|
||||
fn.mockImplementation(resolve),
|
||||
);
|
||||
await manager.scheduleTask({
|
||||
id: 'task3',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: { cron: '* * * * * *' },
|
||||
fn,
|
||||
scope: 'local',
|
||||
});
|
||||
|
||||
const shutdownHook = addShutdownHook.mock.calls[0][0];
|
||||
const abortSignal = await promise;
|
||||
expect(abortSignal.aborted).toBe(false);
|
||||
|
||||
// Should be aborted after the shutdown hook is invoked
|
||||
await shutdownHook();
|
||||
expect(abortSignal.aborted).toBe(true);
|
||||
}, 60_000);
|
||||
});
|
||||
|
||||
describe('triggerTask with local scope', () => {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import {
|
||||
LoggerService,
|
||||
RootLifecycleService,
|
||||
SchedulerService,
|
||||
SchedulerServiceTaskDescriptor,
|
||||
SchedulerServiceTaskFunction,
|
||||
@@ -29,7 +30,7 @@ import { Duration } from 'luxon';
|
||||
import { LocalTaskWorker } from './LocalTaskWorker';
|
||||
import { TaskWorker } from './TaskWorker';
|
||||
import { TaskSettingsV2 } from './types';
|
||||
import { TRACER_ID, validateId } from './util';
|
||||
import { delegateAbortController, TRACER_ID, validateId } from './util';
|
||||
|
||||
const tracer = trace.getTracer(TRACER_ID);
|
||||
|
||||
@@ -39,6 +40,7 @@ const tracer = trace.getTracer(TRACER_ID);
|
||||
export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
private readonly localTasksById = new Map<string, LocalTaskWorker>();
|
||||
private readonly allScheduledTasks: SchedulerServiceTaskDescriptor[] = [];
|
||||
private readonly shutdownInitiated: Promise<boolean>;
|
||||
|
||||
private readonly counter: Counter;
|
||||
private readonly duration: Histogram;
|
||||
@@ -46,6 +48,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
constructor(
|
||||
private readonly databaseFactory: () => Promise<Knex>,
|
||||
private readonly logger: LoggerService,
|
||||
rootLifecycle?: RootLifecycleService,
|
||||
) {
|
||||
const meter = metrics.getMeter('default');
|
||||
this.counter = meter.createCounter('backend_tasks.task.runs.count', {
|
||||
@@ -55,6 +58,9 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
description: 'Histogram of task run durations',
|
||||
unit: 'seconds',
|
||||
});
|
||||
this.shutdownInitiated = new Promise(shutdownInitiated => {
|
||||
rootLifecycle?.addShutdownHook(() => shutdownInitiated(true));
|
||||
});
|
||||
}
|
||||
|
||||
async triggerTask(id: string): Promise<void> {
|
||||
@@ -83,6 +89,11 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
timeoutAfterDuration: parseDuration(task.timeout),
|
||||
};
|
||||
|
||||
// Delegated abort controller that will abort either when the provided
|
||||
// controller aborts, or when a root lifecycle shutdown happens
|
||||
const abortController = delegateAbortController(task.signal);
|
||||
this.shutdownInitiated.then(() => abortController.abort());
|
||||
|
||||
if (scope === 'global') {
|
||||
const knex = await this.databaseFactory();
|
||||
const worker = new TaskWorker(
|
||||
@@ -91,14 +102,14 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
knex,
|
||||
this.logger.child({ task: task.id }),
|
||||
);
|
||||
await worker.start(settings, { signal: task.signal });
|
||||
await worker.start(settings, { signal: abortController.signal });
|
||||
} else {
|
||||
const worker = new LocalTaskWorker(
|
||||
task.id,
|
||||
this.instrumentedFunction(task, scope),
|
||||
this.logger.child({ task: task.id }),
|
||||
);
|
||||
worker.start(settings, { signal: task.signal });
|
||||
worker.start(settings, { signal: abortController.signal });
|
||||
this.localTasksById.set(task.id, worker);
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ export class TaskWorker {
|
||||
private readonly workCheckFrequency: Duration = DEFAULT_WORK_CHECK_FREQUENCY,
|
||||
) {}
|
||||
|
||||
async start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) {
|
||||
async start(settings: TaskSettingsV2, options: { signal: AbortSignal }) {
|
||||
try {
|
||||
await this.persistTask(settings);
|
||||
} catch (e) {
|
||||
@@ -68,18 +68,18 @@ export class TaskWorker {
|
||||
if (settings.initialDelayDuration) {
|
||||
await sleep(
|
||||
Duration.fromISO(settings.initialDelayDuration),
|
||||
options?.signal,
|
||||
options.signal,
|
||||
);
|
||||
}
|
||||
|
||||
while (!options?.signal?.aborted) {
|
||||
const runResult = await this.runOnce(options?.signal);
|
||||
while (!options.signal.aborted) {
|
||||
const runResult = await this.runOnce(options.signal);
|
||||
|
||||
if (runResult.result === 'abort') {
|
||||
break;
|
||||
}
|
||||
|
||||
await sleep(workCheckFrequency, options?.signal);
|
||||
await sleep(workCheckFrequency, options.signal);
|
||||
}
|
||||
|
||||
this.logger.info(`Task worker finished: ${this.taskId}`);
|
||||
@@ -122,7 +122,7 @@ export class TaskWorker {
|
||||
* @returns The outcome of the attempt
|
||||
*/
|
||||
private async runOnce(
|
||||
signal?: AbortSignal,
|
||||
signal: AbortSignal,
|
||||
): Promise<
|
||||
| { result: 'not-ready-yet' }
|
||||
| { result: 'abort' }
|
||||
|
||||
@@ -34,8 +34,9 @@ export const schedulerServiceFactory = createServiceFactory({
|
||||
deps: {
|
||||
database: coreServices.database,
|
||||
logger: coreServices.logger,
|
||||
rootLifecycle: coreServices.rootLifecycle,
|
||||
},
|
||||
async factory({ database, logger }) {
|
||||
return DefaultSchedulerService.create({ database, logger });
|
||||
async factory({ database, logger, rootLifecycle }) {
|
||||
return DefaultSchedulerService.create({ database, logger, rootLifecycle });
|
||||
},
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user