Add cancelTask to SchedulerService for cancelling running tasks

Adds the ability to cancel currently running scheduled tasks via a new
cancelTask method on the SchedulerService interface. For global (distributed)
tasks, the database lock is released and a periodic liveness check detects
the lost ticket and aborts the task function's AbortSignal. For local tasks,
the abort signal is triggered directly. Also adds a REST endpoint at
POST /.backstage/scheduler/v1/tasks/:id/cancel.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Fredrik Adelöw <freben@spotify.com>
This commit is contained in:
Fredrik
2026-03-09 21:54:22 +01:00
committed by Fredrik Adelöw
parent f3f99b811e
commit 015668c5d2
8 changed files with 329 additions and 5 deletions
+6
View File
@@ -0,0 +1,6 @@
---
'@backstage/backend-plugin-api': minor
'@backstage/backend-defaults': patch
---
Added `cancelTask` method to the `SchedulerService` interface and implementation, allowing cancellation of currently running scheduled tasks. For global tasks, the database lock is released and a periodic liveness check aborts the running task function. For local tasks, the task's abort signal is triggered directly. A new `POST /.backstage/scheduler/v1/tasks/:id/cancel` endpoint is also available.
@@ -16,6 +16,7 @@
import { LocalTaskWorker } from './LocalTaskWorker';
import { mockServices } from '@backstage/backend-test-utils';
import { ConflictError } from '@backstage/errors';
import waitFor from 'wait-for-expect';
jest.setTimeout(10_000);
@@ -110,6 +111,54 @@ describe('LocalTaskWorker', () => {
controller.abort();
});
it('can cancel a running task', async () => {
let receivedSignal: AbortSignal | undefined;
const fn = jest.fn(async (signal: AbortSignal) => {
receivedSignal = signal;
await new Promise(r => setTimeout(r, 5000));
});
const controller = new AbortController();
const worker = new LocalTaskWorker('a', fn, logger);
worker.start(
{
version: 2,
cadence: 'PT10S',
timeoutAfterDuration: 'PT10S',
},
{ signal: controller.signal },
);
await waitFor(() => {
expect(fn).toHaveBeenCalledTimes(1);
});
expect(receivedSignal?.aborted).toBe(false);
worker.cancel();
expect(receivedSignal?.aborted).toBe(true);
controller.abort();
});
it('cannot cancel a task that is not running', async () => {
const fn = jest.fn();
const controller = new AbortController();
const worker = new LocalTaskWorker('a', fn, logger);
worker.start(
{
version: 2,
initialDelayDuration: 'PT1000S',
cadence: 'PT10S',
timeoutAfterDuration: 'PT10S',
},
{ signal: controller.signal },
);
expect(() => worker.cancel()).toThrow(ConflictError);
controller.abort();
});
it('goes through the expected states', async () => {
const fn = jest
.fn()
@@ -29,6 +29,7 @@ import { delegateAbortController, serializeError, sleep } from './util';
*/
export class LocalTaskWorker {
private abortWait: AbortController | undefined;
private taskAbortController: AbortController | undefined;
#taskState: Exclude<TaskApiTasksResponse['taskState'], null> = {
status: 'idle',
};
@@ -93,6 +94,13 @@ export class LocalTaskWorker {
this.abortWait.abort();
}
cancel(): void {
if (!this.taskAbortController) {
throw new ConflictError(`Task ${this.taskId} is not running`);
}
this.taskAbortController.abort();
}
taskState(): TaskApiTasksResponse['taskState'] {
return this.#taskState;
}
@@ -134,10 +142,10 @@ export class LocalTaskWorker {
): Promise<void> {
// Abort the task execution either if the worker is stopped, or if the
// task timeout is hit
const taskAbortController = delegateAbortController(signal);
this.taskAbortController = delegateAbortController(signal);
const timeoutDuration = Duration.fromISO(settings.timeoutAfterDuration);
const timeoutHandle = setTimeout(() => {
taskAbortController.abort();
this.taskAbortController?.abort();
}, timeoutDuration.as('milliseconds'));
this.#taskState = {
@@ -152,7 +160,7 @@ export class LocalTaskWorker {
};
try {
await this.fn(taskAbortController.signal);
await this.fn(this.taskAbortController.signal);
this.#taskState.lastRunEndedAt = DateTime.utc().toISO()!;
this.#taskState.lastRunError = undefined;
} catch (e) {
@@ -162,7 +170,8 @@ export class LocalTaskWorker {
// release resources
clearTimeout(timeoutHandle);
taskAbortController.abort();
this.taskAbortController.abort();
this.taskAbortController = undefined;
}
/**
@@ -399,6 +399,102 @@ describe('PluginTaskManagerImpl', () => {
);
});
describe('cancelTask with local scope', () => {
it('can cancel a running task', async () => {
const { manager } = await init('SQLITE_3');
const promise = createDeferred();
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
fn: async () => {
promise.resolve();
await new Promise(r => setTimeout(r, 20000));
},
scope: 'local',
});
await promise;
await expect(manager.cancelTask('task1')).resolves.toBeUndefined();
}, 60_000);
it('cannot cancel a task that is not running', async () => {
const { manager } = await init('SQLITE_3');
const fn = jest.fn();
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
initialDelay: Duration.fromObject({ years: 1 }),
fn,
scope: 'local',
});
await expect(manager.cancelTask('task1')).rejects.toThrow(ConflictError);
}, 60_000);
});
describe('cancelTask with global scope', () => {
it.each(databases.eachSupportedId())(
'can cancel a running task, %p',
async databaseId => {
const { manager } = await init(databaseId);
const promise = createDeferred();
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
fn: async () => {
promise.resolve();
await new Promise(r => setTimeout(r, 20000));
},
scope: 'global',
});
await promise;
await expect(manager.cancelTask('task1')).resolves.toBeUndefined();
},
);
it.each(databases.eachSupportedId())(
'cannot cancel a non-existent task, %p',
async databaseId => {
const { manager } = await init(databaseId);
await expect(manager.cancelTask('nonexistent')).rejects.toThrow(
NotFoundError,
);
},
);
it.each(databases.eachSupportedId())(
'cannot cancel a task that is not running, %p',
async databaseId => {
const { manager } = await init(databaseId);
const fn = jest.fn();
const promise = new Promise(resolve => fn.mockImplementation(resolve));
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
initialDelay: Duration.fromObject({ years: 1 }),
fn,
scope: 'global',
});
await expect(manager.cancelTask('task1')).rejects.toThrow(
ConflictError,
);
},
);
});
describe('parseDuration', () => {
it('should parse durations', () => {
expect(parseDuration({ milliseconds: 5000 })).toEqual('PT5S');
@@ -107,6 +107,17 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
await TaskWorker.trigger(knex, id);
}
async cancelTask(id: string): Promise<void> {
const localTask = this.localWorkersById.get(id);
if (localTask) {
localTask.cancel();
return;
}
const knex = await this.databaseFactory();
await TaskWorker.cancel(knex, id);
}
async scheduleTask(
task: SchedulerServiceTaskScheduleDefinition &
SchedulerServiceTaskInvocationDefinition,
@@ -206,6 +217,15 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
},
);
router.post(
'/.backstage/scheduler/v1/tasks/:id/cancel',
async (req, res) => {
const { id } = req.params;
await this.cancelTask(id);
res.status(200).end();
},
);
return router;
}
@@ -15,6 +15,7 @@
*/
import { TestDatabases, mockServices } from '@backstage/backend-test-utils';
import { ConflictError, NotFoundError } from '@backstage/errors';
import { DateTime, Duration } from 'luxon';
import waitForExpect from 'wait-for-expect';
import { migrateBackendTasks } from '../database/migrateBackendTasks';
@@ -584,4 +585,79 @@ describe('TaskWorker', () => {
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'can cancel a running task, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(async () => {});
const settings: TaskSettingsV2 = {
version: 2,
cadence: '* * * * * *',
initialDelayDuration: undefined,
timeoutAfterDuration: Duration.fromObject({ minutes: 1 }).toISO()!,
};
const worker = new TaskWorker('task1', fn, knex, logger);
await worker.persistTask(settings);
await worker.tryClaimTask('ticket', settings);
// Verify the task is running
let row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row.current_run_ticket).toBe('ticket');
await TaskWorker.cancel(knex, 'task1');
// Verify the task is now idle with a cancellation error recorded
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row.current_run_ticket).toBeNull();
expect(row.current_run_started_at).toBeNull();
expect(row.current_run_expires_at).toBeNull();
expect(row.last_run_ended_at).not.toBeNull();
expect(row.last_run_error_json).toContain('Task was cancelled');
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'cannot cancel a non-existent task, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
await expect(TaskWorker.cancel(knex, 'nonexistent')).rejects.toThrow(
NotFoundError,
);
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'cannot cancel a task that is not running, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(async () => {});
const settings: TaskSettingsV2 = {
version: 2,
cadence: '* * * * * *',
initialDelayDuration: undefined,
timeoutAfterDuration: Duration.fromObject({ minutes: 1 }).toISO()!,
};
const worker = new TaskWorker('task1', fn, knex, logger);
await worker.persistTask(settings);
await expect(TaskWorker.cancel(knex, 'task1')).rejects.toThrow(
ConflictError,
);
await knex.destroy();
},
);
});
@@ -152,6 +152,31 @@ export class TaskWorker {
}
}
static async cancel(knex: Knex, taskId: string): Promise<void> {
// check if task exists
const rows = await knex<DbTasksRow>(DB_TASKS_TABLE)
.select(knex.raw(1))
.where('id', '=', taskId);
if (rows.length !== 1) {
throw new NotFoundError(`Task ${taskId} does not exist`);
}
const dbNull = knex.raw('null');
const updatedRows = await knex<DbTasksRow>(DB_TASKS_TABLE)
.where('id', '=', taskId)
.whereNotNull('current_run_ticket')
.update({
current_run_ticket: dbNull,
current_run_started_at: dbNull,
current_run_expires_at: dbNull,
last_run_ended_at: knex.fn.now(),
last_run_error_json: serializeError(new Error('Task was cancelled')),
});
if (updatedRows < 1) {
throw new ConflictError(`Task ${taskId} is not running`);
}
}
static async taskStates(
knex: Knex,
): Promise<Map<string, TaskApiTasksResponse['taskState']>> {
@@ -227,11 +252,16 @@ export class TaskWorker {
}
// Abort the task execution either if the worker is stopped, or if the
// task timeout is hit
// task timeout is hit, or if the task ticket was lost (e.g. due to
// cancellation from another host)
const taskAbortController = delegateAbortController(signal);
const timeoutHandle = setTimeout(() => {
taskAbortController.abort();
}, Duration.fromISO(taskSettings.timeoutAfterDuration).as('milliseconds'));
const livenessHandle = setInterval(
() => this.checkLiveness(ticket, taskAbortController),
this.workCheckFrequency.as('milliseconds'),
);
try {
this.#workerState = {
@@ -248,6 +278,7 @@ export class TaskWorker {
status: 'idle',
};
clearTimeout(timeoutHandle);
clearInterval(livenessHandle);
}
await this.tryReleaseTask(ticket, taskSettings);
@@ -334,6 +365,33 @@ export class TaskWorker {
);
}
/**
* Checks whether the current task ticket is still valid in the database.
* If the ticket has been cleared (e.g. by cancellation or janitor cleanup),
* aborts the task execution.
*/
private async checkLiveness(
ticket: string,
taskAbortController: AbortController,
): Promise<void> {
try {
const [row] = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
.where('id', '=', this.taskId)
.select('current_run_ticket');
if (!row || row.current_run_ticket !== ticket) {
this.logger.info(
`Task ticket for "${this.taskId}" is no longer valid; aborting execution`,
);
taskAbortController.abort();
}
} catch (e) {
this.logger.warn(
`Failed to check liveness for task "${this.taskId}", ${e}`,
);
}
}
/**
* Check if the task is ready to run
*/
@@ -304,6 +304,16 @@ export interface SchedulerService {
*/
triggerTask(id: string): Promise<void>;
/**
* Cancels a currently running task by ID, marking it as idle.
*
* If the task doesn't exist, a NotFoundError is thrown. If the task is
* not currently running, a ConflictError is thrown.
*
* @param id - The task ID
*/
cancelTask(id: string): Promise<void>;
/**
* Schedules a task function for recurring runs.
*