@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/backend-defaults': patch
|
||||
---
|
||||
|
||||
The `DefaultSchedulerService` now accepts `HttpRouterService` and `PluginMetadataService` arguments. If you supply a router, the scheduler will register a REST API for listing and triggering tasks.
|
||||
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-backend-module-backstage-openapi': patch
|
||||
---
|
||||
|
||||
Do not swallow errors; instead allow them to bubble up to the task scheduler for better tracking and logging.
|
||||
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/backend-plugin-api': patch
|
||||
---
|
||||
|
||||
Minor doc comment update
|
||||
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// @ts-check
|
||||
|
||||
/**
|
||||
* @param { import("knex").Knex } knex
|
||||
* @returns { Promise<void> }
|
||||
*/
|
||||
exports.up = async function up(knex) {
|
||||
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
|
||||
table
|
||||
.text('last_run_error_json', 'longtext')
|
||||
.nullable()
|
||||
.comment(
|
||||
'JSON serialized error object from the last task run, if it failed',
|
||||
);
|
||||
table
|
||||
.dateTime('last_run_ended_at')
|
||||
.nullable()
|
||||
.comment('The last time that the task ended');
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* @param { import("knex").Knex } knex
|
||||
* @returns { Promise<void> }
|
||||
*/
|
||||
exports.down = async function down(knex) {
|
||||
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
|
||||
table.dropColumn('last_run_error_json');
|
||||
table.dropColumn('last_run_ended_at');
|
||||
});
|
||||
};
|
||||
@@ -4,7 +4,9 @@
|
||||
|
||||
```ts
|
||||
import { DatabaseService } from '@backstage/backend-plugin-api';
|
||||
import { HttpRouterService } from '@backstage/backend-plugin-api';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import { PluginMetadataService } from '@backstage/backend-plugin-api';
|
||||
import { RootLifecycleService } from '@backstage/backend-plugin-api';
|
||||
import { SchedulerService } from '@backstage/backend-plugin-api';
|
||||
import { ServiceFactory } from '@backstage/backend-plugin-api';
|
||||
@@ -16,6 +18,8 @@ export class DefaultSchedulerService {
|
||||
database: DatabaseService;
|
||||
logger: LoggerService;
|
||||
rootLifecycle?: RootLifecycleService;
|
||||
httpRouter?: HttpRouterService;
|
||||
pluginMetadata?: PluginMetadataService;
|
||||
}): SchedulerService;
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
| `current_run_started_at` | `timestamp with time zone` | true | - | - |
|
||||
| `current_run_ticket` | `text` | true | - | - |
|
||||
| `id` | `character varying` | false | 255 | - |
|
||||
| `last_run_ended_at` | `timestamp with time zone` | true | - | - |
|
||||
| `last_run_error_json` | `text` | true | - | - |
|
||||
| `next_run_start_at` | `timestamp with time zone` | true | - | - |
|
||||
| `settings_json` | `text` | false | - | - |
|
||||
|
||||
|
||||
@@ -18,12 +18,12 @@ import { resolvePackagePath } from '@backstage/backend-plugin-api';
|
||||
import { Knex } from 'knex';
|
||||
import { DB_MIGRATIONS_TABLE } from './tables';
|
||||
|
||||
export async function migrateBackendTasks(knex: Knex): Promise<void> {
|
||||
const migrationsDir = resolvePackagePath(
|
||||
'@backstage/backend-defaults',
|
||||
'migrations/scheduler',
|
||||
);
|
||||
export const migrationsDir = resolvePackagePath(
|
||||
'@backstage/backend-defaults',
|
||||
'migrations/scheduler',
|
||||
);
|
||||
|
||||
export async function migrateBackendTasks(knex: Knex): Promise<void> {
|
||||
await knex.migrate.latest({
|
||||
directory: migrationsDir,
|
||||
tableName: DB_MIGRATIONS_TABLE,
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Copyright 2022 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Knex } from 'knex';
|
||||
import { TestDatabases } from '@backstage/backend-test-utils';
|
||||
import fs from 'fs';
|
||||
import { migrationsDir } from './migrateBackendTasks';
|
||||
|
||||
const migrationsFiles = fs.readdirSync(migrationsDir).sort();
|
||||
|
||||
async function migrateUpOnce(knex: Knex): Promise<void> {
|
||||
await knex.migrate.up({ directory: migrationsDir });
|
||||
}
|
||||
|
||||
async function migrateDownOnce(knex: Knex): Promise<void> {
|
||||
await knex.migrate.down({ directory: migrationsDir });
|
||||
}
|
||||
|
||||
async function migrateUntilBefore(knex: Knex, target: string): Promise<void> {
|
||||
const index = migrationsFiles.indexOf(target);
|
||||
if (index === -1) {
|
||||
throw new Error(`Migration ${target} not found`);
|
||||
}
|
||||
for (let i = 0; i < index; i++) {
|
||||
await migrateUpOnce(knex);
|
||||
}
|
||||
}
|
||||
|
||||
jest.setTimeout(60_000);
|
||||
|
||||
describe('migrations', () => {
|
||||
const databases = TestDatabases.create();
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'20250411000000_last_run.js, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
|
||||
await migrateUntilBefore(knex, '20250411000000_last_run.js');
|
||||
|
||||
await knex
|
||||
.insert({
|
||||
id: 'i',
|
||||
settings_json: '{}',
|
||||
})
|
||||
.into('backstage_backend_tasks__tasks');
|
||||
|
||||
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
|
||||
{
|
||||
id: 'i',
|
||||
settings_json: '{}',
|
||||
next_run_start_at: null,
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
},
|
||||
]);
|
||||
|
||||
await migrateUpOnce(knex);
|
||||
|
||||
await knex
|
||||
.table('backstage_backend_tasks__tasks')
|
||||
.update({ last_run_error_json: 'error' })
|
||||
.where({ id: 'i' });
|
||||
|
||||
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
|
||||
{
|
||||
id: 'i',
|
||||
settings_json: '{}',
|
||||
next_run_start_at: null,
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
last_run_ended_at: null,
|
||||
last_run_error_json: 'error',
|
||||
},
|
||||
]);
|
||||
|
||||
await migrateDownOnce(knex);
|
||||
|
||||
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
|
||||
{
|
||||
id: 'i',
|
||||
settings_json: '{}',
|
||||
next_run_start_at: null,
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
},
|
||||
]);
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
});
|
||||
@@ -20,8 +20,10 @@ export const DB_TASKS_TABLE = 'backstage_backend_tasks__tasks';
|
||||
export type DbTasksRow = {
|
||||
id: string;
|
||||
settings_json: string;
|
||||
next_run_start_at: Date;
|
||||
next_run_start_at?: Date | string; // This can be null when in manual trigger mode
|
||||
current_run_ticket?: string;
|
||||
current_run_started_at?: Date | string;
|
||||
current_run_expires_at?: Date | string;
|
||||
last_run_error_json?: string;
|
||||
last_run_ended_at?: Date | string;
|
||||
};
|
||||
|
||||
@@ -16,7 +16,9 @@
|
||||
|
||||
import {
|
||||
DatabaseService,
|
||||
HttpRouterService,
|
||||
LoggerService,
|
||||
PluginMetadataService,
|
||||
RootLifecycleService,
|
||||
SchedulerService,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
@@ -36,6 +38,8 @@ export class DefaultSchedulerService {
|
||||
database: DatabaseService;
|
||||
logger: LoggerService;
|
||||
rootLifecycle?: RootLifecycleService;
|
||||
httpRouter?: HttpRouterService;
|
||||
pluginMetadata?: PluginMetadataService;
|
||||
}): SchedulerService {
|
||||
const databaseFactory = once(async () => {
|
||||
const knex = await options.database.getClient();
|
||||
@@ -59,10 +63,15 @@ export class DefaultSchedulerService {
|
||||
return knex;
|
||||
});
|
||||
|
||||
return new PluginTaskSchedulerImpl(
|
||||
const scheduler = new PluginTaskSchedulerImpl(
|
||||
options.pluginMetadata?.getId() ?? 'unknown',
|
||||
databaseFactory,
|
||||
options.logger,
|
||||
options.rootLifecycle,
|
||||
);
|
||||
|
||||
options.httpRouter?.use(scheduler.getRouter());
|
||||
|
||||
return scheduler;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,9 @@
|
||||
|
||||
import { LocalTaskWorker } from './LocalTaskWorker';
|
||||
import { mockServices } from '@backstage/backend-test-utils';
|
||||
import waitFor from 'wait-for-expect';
|
||||
|
||||
jest.setTimeout(10_000);
|
||||
|
||||
describe('LocalTaskWorker', () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
@@ -106,4 +109,108 @@ describe('LocalTaskWorker', () => {
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
controller.abort();
|
||||
});
|
||||
|
||||
it('goes through the expected states', async () => {
|
||||
const fn = jest
|
||||
.fn()
|
||||
.mockImplementationOnce(() => new Promise<void>(r => setTimeout(r, 100)))
|
||||
.mockImplementationOnce(
|
||||
() =>
|
||||
new Promise<void>((_, r) =>
|
||||
setTimeout(() => r(new Error('boo')), 100),
|
||||
),
|
||||
)
|
||||
.mockImplementation(() => new Promise<void>(r => setTimeout(r, 100)));
|
||||
const controller = new AbortController();
|
||||
|
||||
const worker = new LocalTaskWorker('a', fn, logger);
|
||||
worker.start(
|
||||
{
|
||||
version: 2,
|
||||
initialDelayDuration: 'PT0.5S',
|
||||
cadence: 'PT0.5S',
|
||||
timeoutAfterDuration: 'PT1S',
|
||||
},
|
||||
{ signal: controller.signal },
|
||||
);
|
||||
|
||||
await waitFor(() => {
|
||||
expect(worker.taskState()).toEqual({
|
||||
status: 'idle',
|
||||
startsAt: expect.any(String),
|
||||
});
|
||||
expect(worker.workerState()).toEqual({
|
||||
status: 'initial-wait',
|
||||
});
|
||||
});
|
||||
|
||||
// Start, complete successfully
|
||||
await waitFor(() => {
|
||||
expect(worker.taskState()).toEqual({
|
||||
status: 'running',
|
||||
startedAt: expect.any(String),
|
||||
timesOutAt: expect.any(String),
|
||||
});
|
||||
expect(worker.workerState()).toEqual({
|
||||
status: 'running',
|
||||
});
|
||||
});
|
||||
await waitFor(() => {
|
||||
expect(worker.taskState()).toEqual({
|
||||
status: 'idle',
|
||||
startsAt: expect.any(String),
|
||||
lastRunEndedAt: expect.any(String),
|
||||
});
|
||||
expect(worker.workerState()).toEqual({
|
||||
status: 'idle',
|
||||
});
|
||||
});
|
||||
|
||||
// Start, complete with error
|
||||
await waitFor(() => {
|
||||
expect(worker.taskState()).toEqual({
|
||||
status: 'running',
|
||||
startedAt: expect.any(String),
|
||||
timesOutAt: expect.any(String),
|
||||
lastRunEndedAt: expect.any(String),
|
||||
});
|
||||
expect(worker.workerState()).toEqual({
|
||||
status: 'running',
|
||||
});
|
||||
});
|
||||
await waitFor(() => {
|
||||
expect(worker.taskState()).toEqual({
|
||||
status: 'idle',
|
||||
startsAt: expect.any(String),
|
||||
lastRunEndedAt: expect.any(String),
|
||||
lastRunError: expect.any(String),
|
||||
});
|
||||
expect(worker.workerState()).toEqual({
|
||||
status: 'idle',
|
||||
});
|
||||
});
|
||||
|
||||
// Start, complete successfully
|
||||
await waitFor(() => {
|
||||
expect(worker.taskState()).toEqual({
|
||||
status: 'running',
|
||||
startedAt: expect.any(String),
|
||||
timesOutAt: expect.any(String),
|
||||
lastRunEndedAt: expect.any(String),
|
||||
});
|
||||
expect(worker.workerState()).toEqual({
|
||||
status: 'running',
|
||||
});
|
||||
});
|
||||
await waitFor(() => {
|
||||
expect(worker.taskState()).toEqual({
|
||||
status: 'idle',
|
||||
startsAt: expect.any(String),
|
||||
lastRunEndedAt: expect.any(String),
|
||||
});
|
||||
expect(worker.workerState()).toEqual({
|
||||
status: 'idle',
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -19,8 +19,8 @@ import { SchedulerServiceTaskFunction } from '@backstage/backend-plugin-api';
|
||||
import { ConflictError } from '@backstage/errors';
|
||||
import { CronTime } from 'cron';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
import { TaskSettingsV2 } from './types';
|
||||
import { delegateAbortController, sleep } from './util';
|
||||
import { TaskSettingsV2, TaskApiTasksResponse } from './types';
|
||||
import { delegateAbortController, serializeError, sleep } from './util';
|
||||
|
||||
/**
|
||||
* Implements tasks that run locally without cross-host collaboration.
|
||||
@@ -29,6 +29,12 @@ import { delegateAbortController, sleep } from './util';
|
||||
*/
|
||||
export class LocalTaskWorker {
|
||||
private abortWait: AbortController | undefined;
|
||||
#taskState: Exclude<TaskApiTasksResponse['taskState'], null> = {
|
||||
status: 'idle',
|
||||
};
|
||||
#workerState: TaskApiTasksResponse['workerState'] = {
|
||||
status: 'idle',
|
||||
};
|
||||
|
||||
constructor(
|
||||
private readonly taskId: string,
|
||||
@@ -45,12 +51,7 @@ export class LocalTaskWorker {
|
||||
let attemptNum = 1;
|
||||
for (;;) {
|
||||
try {
|
||||
if (settings.initialDelayDuration) {
|
||||
await this.sleep(
|
||||
Duration.fromISO(settings.initialDelayDuration),
|
||||
options.signal,
|
||||
);
|
||||
}
|
||||
await this.performInitialWait(settings, options.signal);
|
||||
|
||||
while (!options.signal.aborted) {
|
||||
const startTime = process.hrtime();
|
||||
@@ -84,6 +85,38 @@ export class LocalTaskWorker {
|
||||
this.abortWait.abort();
|
||||
}
|
||||
|
||||
taskState(): TaskApiTasksResponse['taskState'] {
|
||||
return this.#taskState;
|
||||
}
|
||||
|
||||
workerState(): TaskApiTasksResponse['workerState'] {
|
||||
return this.#workerState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Does the once-at-startup initial wait, if configured.
|
||||
*/
|
||||
private async performInitialWait(
|
||||
settings: TaskSettingsV2,
|
||||
signal: AbortSignal,
|
||||
): Promise<void> {
|
||||
if (settings.initialDelayDuration) {
|
||||
const parsedDuration = Duration.fromISO(settings.initialDelayDuration);
|
||||
|
||||
this.#taskState = {
|
||||
status: 'idle',
|
||||
startsAt: DateTime.utc().plus(parsedDuration).toISO()!,
|
||||
lastRunEndedAt: this.#taskState.lastRunEndedAt,
|
||||
lastRunError: this.#taskState.lastRunError,
|
||||
};
|
||||
this.#workerState = {
|
||||
status: 'initial-wait',
|
||||
};
|
||||
|
||||
await this.sleep(parsedDuration, signal);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes a single attempt at running the task to completion.
|
||||
*/
|
||||
@@ -94,14 +127,29 @@ export class LocalTaskWorker {
|
||||
// Abort the task execution either if the worker is stopped, or if the
|
||||
// task timeout is hit
|
||||
const taskAbortController = delegateAbortController(signal);
|
||||
const timeoutDuration = Duration.fromISO(settings.timeoutAfterDuration);
|
||||
const timeoutHandle = setTimeout(() => {
|
||||
taskAbortController.abort();
|
||||
}, Duration.fromISO(settings.timeoutAfterDuration).as('milliseconds'));
|
||||
}, timeoutDuration.as('milliseconds'));
|
||||
|
||||
this.#taskState = {
|
||||
status: 'running',
|
||||
startedAt: DateTime.utc().toISO()!,
|
||||
timesOutAt: DateTime.utc().plus(timeoutDuration).toISO()!,
|
||||
lastRunEndedAt: this.#taskState.lastRunEndedAt,
|
||||
lastRunError: this.#taskState.lastRunError,
|
||||
};
|
||||
this.#workerState = {
|
||||
status: 'running',
|
||||
};
|
||||
|
||||
try {
|
||||
await this.fn(taskAbortController.signal);
|
||||
this.#taskState.lastRunEndedAt = DateTime.utc().toISO()!;
|
||||
this.#taskState.lastRunError = undefined;
|
||||
} catch (e) {
|
||||
// ignore intentionally
|
||||
this.#taskState.lastRunEndedAt = DateTime.utc().toISO()!;
|
||||
this.#taskState.lastRunError = serializeError(e);
|
||||
}
|
||||
|
||||
// release resources
|
||||
@@ -133,11 +181,20 @@ export class LocalTaskWorker {
|
||||
}
|
||||
|
||||
dt = Math.max(dt, 0);
|
||||
const startsAt = DateTime.now().plus(Duration.fromMillis(dt));
|
||||
|
||||
this.#taskState = {
|
||||
status: 'idle',
|
||||
startsAt: startsAt.toISO()!,
|
||||
lastRunEndedAt: this.#taskState.lastRunEndedAt,
|
||||
lastRunError: this.#taskState.lastRunError,
|
||||
};
|
||||
this.#workerState = {
|
||||
status: 'idle',
|
||||
};
|
||||
|
||||
this.logger.debug(
|
||||
`task: ${this.taskId} will next occur around ${DateTime.now().plus(
|
||||
Duration.fromMillis(dt),
|
||||
)}`,
|
||||
`task: ${this.taskId} will next occur around ${startsAt}`,
|
||||
);
|
||||
|
||||
await this.sleep(Duration.fromMillis(dt), signal);
|
||||
|
||||
+1
@@ -53,6 +53,7 @@ describe('PluginTaskManagerImpl', () => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
const manager = new PluginTaskSchedulerImpl(
|
||||
'myplugin',
|
||||
async () => knex,
|
||||
mockServices.logger.mock(),
|
||||
{
|
||||
|
||||
@@ -27,9 +27,11 @@ import {
|
||||
import { Counter, Histogram, Gauge, metrics, trace } from '@opentelemetry/api';
|
||||
import { Knex } from 'knex';
|
||||
import { Duration } from 'luxon';
|
||||
import express from 'express';
|
||||
import Router from 'express-promise-router';
|
||||
import { LocalTaskWorker } from './LocalTaskWorker';
|
||||
import { TaskWorker } from './TaskWorker';
|
||||
import { TaskSettingsV2 } from './types';
|
||||
import { TaskSettingsV2, TaskApiTasksResponse } from './types';
|
||||
import { delegateAbortController, TRACER_ID, validateId } from './util';
|
||||
|
||||
const tracer = trace.getTracer(TRACER_ID);
|
||||
@@ -38,7 +40,8 @@ const tracer = trace.getTracer(TRACER_ID);
|
||||
* Implements the actual task management.
|
||||
*/
|
||||
export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
private readonly localTasksById = new Map<string, LocalTaskWorker>();
|
||||
private readonly localWorkersById = new Map<string, LocalTaskWorker>();
|
||||
private readonly globalWorkersById = new Map<string, TaskWorker>();
|
||||
private readonly allScheduledTasks: SchedulerServiceTaskDescriptor[] = [];
|
||||
private readonly shutdownInitiated: Promise<boolean>;
|
||||
|
||||
@@ -48,6 +51,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
private readonly lastCompleted: Gauge;
|
||||
|
||||
constructor(
|
||||
private readonly pluginId: string,
|
||||
private readonly databaseFactory: () => Promise<Knex>,
|
||||
private readonly logger: LoggerService,
|
||||
rootLifecycle?: RootLifecycleService,
|
||||
@@ -77,7 +81,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
}
|
||||
|
||||
async triggerTask(id: string): Promise<void> {
|
||||
const localTask = this.localTasksById.get(id);
|
||||
const localTask = this.localWorkersById.get(id);
|
||||
if (localTask) {
|
||||
localTask.trigger();
|
||||
return;
|
||||
@@ -116,6 +120,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
this.logger.child({ task: task.id }),
|
||||
);
|
||||
await worker.start(settings, { signal: abortController.signal });
|
||||
this.globalWorkersById.set(task.id, worker);
|
||||
} else {
|
||||
const worker = new LocalTaskWorker(
|
||||
task.id,
|
||||
@@ -123,7 +128,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
this.logger.child({ task: task.id }),
|
||||
);
|
||||
worker.start(settings, { signal: abortController.signal });
|
||||
this.localTasksById.set(task.id, worker);
|
||||
this.localWorkersById.set(task.id, worker);
|
||||
}
|
||||
|
||||
this.allScheduledTasks.push({
|
||||
@@ -147,6 +152,47 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
return this.allScheduledTasks;
|
||||
}
|
||||
|
||||
getRouter(): express.Router {
|
||||
const router = Router();
|
||||
|
||||
router.get('/.backstage/scheduler/v1/tasks', async (_, res) => {
|
||||
const globalState = await TaskWorker.taskStates(
|
||||
await this.databaseFactory(),
|
||||
);
|
||||
|
||||
const tasks = new Array<TaskApiTasksResponse>();
|
||||
for (const task of this.allScheduledTasks) {
|
||||
tasks.push({
|
||||
taskId: task.id,
|
||||
pluginId: this.pluginId,
|
||||
scope: task.scope,
|
||||
settings: task.settings,
|
||||
taskState:
|
||||
this.localWorkersById.get(task.id)?.taskState() ??
|
||||
globalState.get(task.id) ??
|
||||
null,
|
||||
workerState:
|
||||
this.localWorkersById.get(task.id)?.workerState() ??
|
||||
this.globalWorkersById.get(task.id)?.workerState() ??
|
||||
null,
|
||||
});
|
||||
}
|
||||
|
||||
res.json({ tasks });
|
||||
});
|
||||
|
||||
router.post(
|
||||
'/.backstage/scheduler/v1/tasks/:id/trigger',
|
||||
async (req, res) => {
|
||||
const { id } = req.params;
|
||||
await this.triggerTask(id);
|
||||
res.status(200).end();
|
||||
},
|
||||
);
|
||||
|
||||
return router;
|
||||
}
|
||||
|
||||
private instrumentedFunction(
|
||||
task: SchedulerServiceTaskInvocationDefinition,
|
||||
scope: string,
|
||||
|
||||
+2
@@ -87,6 +87,8 @@ describe('PluginTaskSchedulerJanitor', () => {
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
last_run_ended_at: expect.anything(),
|
||||
last_run_error_json: expect.stringContaining('Task timed out'),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
+5
-1
@@ -18,7 +18,7 @@ import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import { Knex } from 'knex';
|
||||
import { Duration } from 'luxon';
|
||||
import { DB_TASKS_TABLE, DbTasksRow } from '../database/tables';
|
||||
import { sleep } from './util';
|
||||
import { serializeError, sleep } from './util';
|
||||
|
||||
/**
|
||||
* Makes sure to auto-expire and clean up things that time out or for other
|
||||
@@ -69,6 +69,8 @@ export class PluginTaskSchedulerJanitor {
|
||||
current_run_ticket: dbNull,
|
||||
current_run_started_at: dbNull,
|
||||
current_run_expires_at: dbNull,
|
||||
last_run_ended_at: this.knex.fn.now(),
|
||||
last_run_error_json: serializeError(new Error('Task timed out')),
|
||||
});
|
||||
} else {
|
||||
tasks = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
@@ -77,6 +79,8 @@ export class PluginTaskSchedulerJanitor {
|
||||
current_run_ticket: dbNull,
|
||||
current_run_started_at: dbNull,
|
||||
current_run_expires_at: dbNull,
|
||||
last_run_ended_at: this.knex.fn.now(),
|
||||
last_run_error_json: serializeError(new Error('Task timed out')),
|
||||
})
|
||||
.returning(['id']);
|
||||
}
|
||||
|
||||
@@ -68,6 +68,17 @@ describe('TaskWorker', () => {
|
||||
initialDelayDuration: 'PT1S',
|
||||
timeoutAfterDuration: 'PT1M',
|
||||
});
|
||||
await expect(TaskWorker.taskStates(knex)).resolves.toEqual(
|
||||
new Map([
|
||||
[
|
||||
'task1',
|
||||
{
|
||||
status: 'idle',
|
||||
startsAt: expect.anything(),
|
||||
},
|
||||
],
|
||||
]),
|
||||
);
|
||||
|
||||
await expect(worker.findReadyTask()).resolves.toEqual({
|
||||
result: 'not-ready-yet',
|
||||
@@ -101,6 +112,18 @@ describe('TaskWorker', () => {
|
||||
current_run_expires_at: expect.anything(),
|
||||
}),
|
||||
);
|
||||
await expect(TaskWorker.taskStates(knex)).resolves.toEqual(
|
||||
new Map([
|
||||
[
|
||||
'task1',
|
||||
{
|
||||
status: 'running',
|
||||
startedAt: expect.anything(),
|
||||
timesOutAt: expect.anything(),
|
||||
},
|
||||
],
|
||||
]),
|
||||
);
|
||||
|
||||
await expect(worker.tryReleaseTask('ticket', settings)).resolves.toBe(
|
||||
true,
|
||||
@@ -115,6 +138,18 @@ describe('TaskWorker', () => {
|
||||
current_run_expires_at: null,
|
||||
}),
|
||||
);
|
||||
await expect(TaskWorker.taskStates(knex)).resolves.toEqual(
|
||||
new Map([
|
||||
[
|
||||
'task1',
|
||||
{
|
||||
status: 'idle',
|
||||
startsAt: expect.anything(),
|
||||
lastRunEndedAt: expect.anything(),
|
||||
},
|
||||
],
|
||||
]),
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
@@ -136,8 +171,21 @@ describe('TaskWorker', () => {
|
||||
const worker = new TaskWorker('task1', fn, knex, logger, checkFrequency);
|
||||
worker.start(settings, { signal: testScopedSignal() });
|
||||
|
||||
await waitForExpect(() => {
|
||||
await waitForExpect(async () => {
|
||||
expect(logger.error).toHaveBeenCalled();
|
||||
await expect(TaskWorker.taskStates(knex)).resolves.toEqual(
|
||||
new Map([
|
||||
[
|
||||
'task1',
|
||||
{
|
||||
status: 'idle',
|
||||
startsAt: expect.anything(),
|
||||
lastRunEndedAt: expect.anything(),
|
||||
lastRunError: expect.stringContaining('failed'),
|
||||
},
|
||||
],
|
||||
]),
|
||||
);
|
||||
});
|
||||
},
|
||||
);
|
||||
@@ -373,7 +421,8 @@ describe('TaskWorker', () => {
|
||||
// contrived check removes a test flakiness based on wall clock time.
|
||||
expect(
|
||||
Math.abs(
|
||||
+new Date(row3.next_run_start_at) - +new Date(row2.next_run_start_at),
|
||||
+new Date(row3.next_run_start_at!) -
|
||||
+new Date(row2.next_run_start_at!),
|
||||
),
|
||||
).toBeLessThanOrEqual(60_000);
|
||||
|
||||
@@ -416,10 +465,10 @@ describe('TaskWorker', () => {
|
||||
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
|
||||
const rowAfterClaimAndReleaseNextStartAt = DateTime.fromJSDate(
|
||||
new Date(rowAfterClaimAndRelease.next_run_start_at),
|
||||
new Date(rowAfterClaimAndRelease.next_run_start_at!),
|
||||
);
|
||||
const row1NextStartAt = DateTime.fromJSDate(
|
||||
new Date(row1.next_run_start_at),
|
||||
new Date(row1.next_run_start_at!),
|
||||
);
|
||||
const now = DateTime.now();
|
||||
expect(
|
||||
@@ -478,10 +527,10 @@ describe('TaskWorker', () => {
|
||||
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
|
||||
const rowAfterClaimAndReleaseNextStartAt = DateTime.fromJSDate(
|
||||
new Date(rowAfterClaimAndRelease.next_run_start_at),
|
||||
new Date(rowAfterClaimAndRelease.next_run_start_at!),
|
||||
);
|
||||
const row1NextStartAt = DateTime.fromJSDate(
|
||||
new Date(row1.next_run_start_at),
|
||||
new Date(row1.next_run_start_at!),
|
||||
);
|
||||
const now = DateTime.now();
|
||||
expect(
|
||||
|
||||
@@ -21,8 +21,18 @@ import { Knex } from 'knex';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import { DB_TASKS_TABLE, DbTasksRow } from '../database/tables';
|
||||
import { TaskSettingsV2, taskSettingsV2Schema } from './types';
|
||||
import { delegateAbortController, nowPlus, sleep } from './util';
|
||||
import {
|
||||
TaskSettingsV2,
|
||||
taskSettingsV2Schema,
|
||||
TaskApiTasksResponse,
|
||||
} from './types';
|
||||
import {
|
||||
delegateAbortController,
|
||||
nowPlus,
|
||||
sleep,
|
||||
dbTime,
|
||||
serializeError,
|
||||
} from './util';
|
||||
import { SchedulerServiceTaskFunction } from '@backstage/backend-plugin-api';
|
||||
|
||||
const DEFAULT_WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });
|
||||
@@ -33,6 +43,10 @@ const DEFAULT_WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });
|
||||
* @private
|
||||
*/
|
||||
export class TaskWorker {
|
||||
#workerState: TaskApiTasksResponse['workerState'] = {
|
||||
status: 'idle',
|
||||
};
|
||||
|
||||
constructor(
|
||||
private readonly taskId: string,
|
||||
private readonly fn: SchedulerServiceTaskFunction,
|
||||
@@ -61,24 +75,17 @@ export class TaskWorker {
|
||||
}
|
||||
}
|
||||
|
||||
let attemptNum = 1;
|
||||
(async () => {
|
||||
let attemptNum = 1;
|
||||
for (;;) {
|
||||
try {
|
||||
if (settings.initialDelayDuration) {
|
||||
await sleep(
|
||||
Duration.fromISO(settings.initialDelayDuration),
|
||||
options.signal,
|
||||
);
|
||||
}
|
||||
await this.performInitialWait(settings, options.signal);
|
||||
|
||||
while (!options.signal.aborted) {
|
||||
const runResult = await this.runOnce(options.signal);
|
||||
|
||||
if (runResult.result === 'abort') {
|
||||
break;
|
||||
}
|
||||
|
||||
await sleep(workCheckFrequency, options.signal);
|
||||
}
|
||||
|
||||
@@ -96,6 +103,24 @@ export class TaskWorker {
|
||||
})();
|
||||
}
|
||||
|
||||
/**
|
||||
* Does the once-at-startup initial wait, if configured.
|
||||
*/
|
||||
private async performInitialWait(
|
||||
settings: TaskSettingsV2,
|
||||
signal: AbortSignal,
|
||||
): Promise<void> {
|
||||
if (settings.initialDelayDuration) {
|
||||
this.#workerState = {
|
||||
status: 'initial-wait',
|
||||
};
|
||||
await sleep(Duration.fromISO(settings.initialDelayDuration), signal);
|
||||
}
|
||||
this.#workerState = {
|
||||
status: 'idle',
|
||||
};
|
||||
}
|
||||
|
||||
static async trigger(knex: Knex, taskId: string): Promise<void> {
|
||||
// check if task exists
|
||||
const rows = await knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
@@ -116,6 +141,51 @@ export class TaskWorker {
|
||||
}
|
||||
}
|
||||
|
||||
static async taskStates(
|
||||
knex: Knex,
|
||||
): Promise<Map<string, TaskApiTasksResponse['taskState']>> {
|
||||
const rows = await knex<DbTasksRow>(DB_TASKS_TABLE);
|
||||
return new Map(
|
||||
rows.map(row => {
|
||||
const startedAt = row.current_run_started_at
|
||||
? dbTime(row.current_run_started_at).toISO()!
|
||||
: undefined;
|
||||
const timesOutAt = row.current_run_expires_at
|
||||
? dbTime(row.current_run_expires_at).toISO()!
|
||||
: undefined;
|
||||
const startsAt = row.next_run_start_at
|
||||
? dbTime(row.next_run_start_at).toISO()!
|
||||
: undefined;
|
||||
const lastRunEndedAt = row.last_run_ended_at
|
||||
? dbTime(row.last_run_ended_at).toISO()!
|
||||
: undefined;
|
||||
const lastRunError = row.last_run_error_json || undefined;
|
||||
|
||||
return [
|
||||
row.id,
|
||||
startedAt
|
||||
? {
|
||||
status: 'running',
|
||||
startedAt,
|
||||
timesOutAt,
|
||||
lastRunEndedAt,
|
||||
lastRunError,
|
||||
}
|
||||
: {
|
||||
status: 'idle',
|
||||
startsAt,
|
||||
lastRunEndedAt,
|
||||
lastRunError,
|
||||
},
|
||||
];
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
workerState(): TaskApiTasksResponse['workerState'] {
|
||||
return this.#workerState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes a single attempt at running the task to completion, if ready.
|
||||
*
|
||||
@@ -153,13 +223,19 @@ export class TaskWorker {
|
||||
}, Duration.fromISO(taskSettings.timeoutAfterDuration).as('milliseconds'));
|
||||
|
||||
try {
|
||||
this.#workerState = {
|
||||
status: 'running',
|
||||
};
|
||||
await this.fn(taskAbortController.signal);
|
||||
taskAbortController.abort(); // releases resources
|
||||
} catch (e) {
|
||||
this.logger.error(e);
|
||||
await this.tryReleaseTask(ticket, taskSettings);
|
||||
await this.tryReleaseTask(ticket, taskSettings, e);
|
||||
return { result: 'failed' };
|
||||
} finally {
|
||||
this.#workerState = {
|
||||
status: 'idle',
|
||||
};
|
||||
clearTimeout(timeoutHandle);
|
||||
}
|
||||
|
||||
@@ -321,6 +397,7 @@ export class TaskWorker {
|
||||
async tryReleaseTask(
|
||||
ticket: string,
|
||||
settings: TaskSettingsV2,
|
||||
error?: Error,
|
||||
): Promise<boolean> {
|
||||
const isManual = settings?.cadence === 'manual';
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
@@ -366,6 +443,10 @@ export class TaskWorker {
|
||||
current_run_ticket: this.knex.raw('null'),
|
||||
current_run_started_at: this.knex.raw('null'),
|
||||
current_run_expires_at: this.knex.raw('null'),
|
||||
last_run_ended_at: this.knex.fn.now(),
|
||||
last_run_error_json: error
|
||||
? serializeError(error)
|
||||
: this.knex.raw('null'),
|
||||
});
|
||||
|
||||
return rows === 1;
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { JsonObject } from '@backstage/types';
|
||||
import { CronTime } from 'cron';
|
||||
import { Duration } from 'luxon';
|
||||
import { z } from 'zod';
|
||||
@@ -97,3 +98,39 @@ export const taskSettingsV2Schema = z.object({
|
||||
* The properties that control a scheduled task (version 2).
|
||||
*/
|
||||
export type TaskSettingsV2 = z.infer<typeof taskSettingsV2Schema>;
|
||||
|
||||
/**
|
||||
* The shape of a task definition as returned by the service's REST API.
|
||||
*/
|
||||
export interface TaskApiTasksResponse {
|
||||
taskId: string;
|
||||
pluginId: string;
|
||||
scope: 'global' | 'local';
|
||||
settings: { version: number } & JsonObject;
|
||||
taskState:
|
||||
| {
|
||||
status: 'running';
|
||||
startedAt: string;
|
||||
timesOutAt?: string;
|
||||
lastRunError?: string;
|
||||
lastRunEndedAt?: string;
|
||||
}
|
||||
| {
|
||||
status: 'idle';
|
||||
startsAt?: string;
|
||||
lastRunError?: string;
|
||||
lastRunEndedAt?: string;
|
||||
}
|
||||
| null;
|
||||
workerState:
|
||||
| {
|
||||
status: 'initial-wait';
|
||||
}
|
||||
| {
|
||||
status: 'idle';
|
||||
}
|
||||
| {
|
||||
status: 'running';
|
||||
}
|
||||
| null;
|
||||
}
|
||||
|
||||
@@ -14,7 +14,10 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { InputError } from '@backstage/errors';
|
||||
import {
|
||||
InputError,
|
||||
serializeError as internalSerializeError,
|
||||
} from '@backstage/errors';
|
||||
import { Knex } from 'knex';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
|
||||
@@ -111,3 +114,11 @@ export function delegateAbortController(parent?: AbortSignal): AbortController {
|
||||
|
||||
return delegate;
|
||||
}
|
||||
|
||||
export function serializeError(error: Error): string {
|
||||
return JSON.stringify(
|
||||
internalSerializeError(error, {
|
||||
includeStack: process.env.NODE_ENV === 'development',
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -15,12 +15,17 @@
|
||||
*/
|
||||
|
||||
import { coreServices } from '@backstage/backend-plugin-api';
|
||||
import { ServiceFactoryTester } from '@backstage/backend-test-utils';
|
||||
import {
|
||||
mockServices,
|
||||
ServiceFactoryTester,
|
||||
} from '@backstage/backend-test-utils';
|
||||
import { schedulerServiceFactory } from './schedulerServiceFactory';
|
||||
|
||||
describe('schedulerFactory', () => {
|
||||
it('creates sidecar database features', async () => {
|
||||
const tester = ServiceFactoryTester.from(schedulerServiceFactory);
|
||||
const tester = ServiceFactoryTester.from(schedulerServiceFactory, {
|
||||
dependencies: [mockServices.rootHttpRouter.factory()],
|
||||
});
|
||||
|
||||
const scheduler = await tester.getSubject();
|
||||
await scheduler.scheduleTask({
|
||||
|
||||
@@ -35,8 +35,22 @@ export const schedulerServiceFactory = createServiceFactory({
|
||||
database: coreServices.database,
|
||||
logger: coreServices.logger,
|
||||
rootLifecycle: coreServices.rootLifecycle,
|
||||
httpRouter: coreServices.httpRouter,
|
||||
pluginMetadata: coreServices.pluginMetadata,
|
||||
},
|
||||
async factory({ database, logger, rootLifecycle }) {
|
||||
return DefaultSchedulerService.create({ database, logger, rootLifecycle });
|
||||
async factory({
|
||||
database,
|
||||
logger,
|
||||
rootLifecycle,
|
||||
httpRouter,
|
||||
pluginMetadata,
|
||||
}) {
|
||||
return DefaultSchedulerService.create({
|
||||
database,
|
||||
logger,
|
||||
rootLifecycle,
|
||||
httpRouter,
|
||||
pluginMetadata,
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
@@ -82,13 +82,12 @@ export interface SchedulerServiceTaskScheduleDefinition {
|
||||
* Overview:
|
||||
*
|
||||
* ```
|
||||
* ┌────────────── second (optional)
|
||||
* │ ┌──────────── minute
|
||||
* │ │ ┌────────── hour
|
||||
* │ │ │ ┌──────── day of month
|
||||
* │ │ │ │ ┌────── month
|
||||
* │ │ │ │ │ ┌──── day of week
|
||||
* │ │ │ │ │ │
|
||||
* ┌────────────── second (0-60, optional)
|
||||
* │ ┌──────────── minute (0-59)
|
||||
* │ │ ┌────────── hour (0-23)
|
||||
* │ │ │ ┌──────── day of month (1-31)
|
||||
* │ │ │ │ ┌────── month (1-12)
|
||||
* │ │ │ │ │ ┌──── day of week (0-6, 0 is Sunday)
|
||||
* │ │ │ │ │ │
|
||||
* * * * * * *
|
||||
* ```
|
||||
@@ -180,13 +179,12 @@ export interface SchedulerServiceTaskScheduleDefinitionConfig {
|
||||
* Overview:
|
||||
*
|
||||
* ```
|
||||
* ┌────────────── second (optional)
|
||||
* │ ┌──────────── minute
|
||||
* │ │ ┌────────── hour
|
||||
* │ │ │ ┌──────── day of month
|
||||
* │ │ │ │ ┌────── month
|
||||
* │ │ │ │ │ ┌──── day of week
|
||||
* │ │ │ │ │ │
|
||||
* ┌────────────── second (0-60, optional)
|
||||
* │ ┌──────────── minute (0-59)
|
||||
* │ │ ┌────────── hour (0-23)
|
||||
* │ │ │ ┌──────── day of month (1-31)
|
||||
* │ │ │ │ ┌────── month (1-12)
|
||||
* │ │ │ │ │ ┌──── day of week (0-6, 0 is Sunday)
|
||||
* │ │ │ │ │ │
|
||||
* * * * * * *
|
||||
* ```
|
||||
|
||||
+1
-5
@@ -232,11 +232,7 @@ export class InternalOpenApiDocumentationProvider implements EntityProvider {
|
||||
taskId,
|
||||
taskInstanceId: uuid.v4(),
|
||||
});
|
||||
try {
|
||||
await this.refresh(logger);
|
||||
} catch (error) {
|
||||
logger.error(`${this.getProviderName()} refresh failed`, error);
|
||||
}
|
||||
await this.refresh(logger);
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
-1
@@ -72,6 +72,5 @@ describe('catalogModuleIncrementalIngestionEntityProvider', () => {
|
||||
expect(addEntityProvider.mock.calls[0][0].getProviderName()).toBe(
|
||||
'provider1',
|
||||
);
|
||||
expect(httpRouterMock.use).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user