Recoverable tasks [version 1]

Signed-off-by: Bogdan Nechyporenko <bnechyporenko@bol.com>
Signed-off-by: bnechyporenko <bnechyporenko@bol.com>
This commit is contained in:
bnechyporenko
2024-01-14 16:18:42 +01:00
parent 56d3373c82
commit 11b9a08e92
31 changed files with 473 additions and 62 deletions
+9
View File
@@ -0,0 +1,9 @@
---
'@backstage/plugin-scaffolder-backend': minor
'@backstage/plugin-scaffolder-common': minor
'@backstage/plugin-scaffolder-react': minor
'@backstage/plugin-scaffolder-node': minor
'@backstage/plugin-scaffolder': patch
---
Introduced the first version of recoverable tasks.
@@ -41,6 +41,23 @@ default, we highly recommend you to set that up. Follow our how-to guide
[How to add documentation setup to your software templates](./how-to-guides.md#how-to-add-the-documentation-setup-to-your-software-templates)
to get started.
### Use the documentation template
There could be _some_ situations where you don't want to keep your docs close to
your code, but still want to publish documentation - for example, an onboarding
tutorial. For this use case, we have put together a documentation template. Your
Backstage instance should by default have a documentation template added. If
not, copy the catalog locations from the
[create-app template](https://github.com/backstage/backstage/blob/master/packages/create-app/templates/default-app/app-config.yaml.hbs)
to add the documentation template. The template creates a component with
**only** TechDocs configuration and default markdown files, and is otherwise
empty.
![Documentation Template](../../assets/techdocs/documentation-template.png)
Create an entity from the documentation template and you will get the needed
setup for free.
### Enable documentation for an already existing entity
Prerequisites:
@@ -54,6 +54,7 @@ export default async function createPlugin(
config: env.config,
database: env.database,
catalogClient: catalogClient,
eventBroker: env.eventBroker,
reader: env.reader,
identity: env.identity,
scheduler: env.scheduler,
+19
View File
@@ -9,6 +9,7 @@ import * as bitbucket from '@backstage/plugin-scaffolder-backend-module-bitbucke
import { CatalogApi } from '@backstage/catalog-client';
import { Config } from '@backstage/config';
import { Duration } from 'luxon';
import { EventBroker } from '@backstage/plugin-events-node';
import { executeShellCommand as executeShellCommand_2 } from '@backstage/plugin-scaffolder-node';
import { ExecuteShellCommandOptions } from '@backstage/plugin-scaffolder-node';
import express from 'express';
@@ -20,6 +21,7 @@ import { HumanDuration } from '@backstage/types';
import { IdentityApi } from '@backstage/plugin-auth-node';
import { JsonObject } from '@backstage/types';
import { Knex } from 'knex';
import { LifecycleService } from '@backstage/backend-plugin-api';
import { Logger } from 'winston';
import { PermissionEvaluator } from '@backstage/plugin-permission-common';
import { PermissionRule } from '@backstage/plugin-permission-node';
@@ -40,6 +42,7 @@ import { TaskBrokerDispatchResult as TaskBrokerDispatchResult_2 } from '@backsta
import { TaskCompletionState as TaskCompletionState_2 } from '@backstage/plugin-scaffolder-node';
import { TaskContext as TaskContext_2 } from '@backstage/plugin-scaffolder-node';
import { TaskEventType as TaskEventType_2 } from '@backstage/plugin-scaffolder-node';
import { TaskRecovery } from '@backstage/plugin-scaffolder-common';
import { TaskSecrets as TaskSecrets_2 } from '@backstage/plugin-scaffolder-node';
import { TaskSpec } from '@backstage/plugin-scaffolder-common';
import { TaskSpecV1beta3 } from '@backstage/plugin-scaffolder-common';
@@ -400,9 +403,12 @@ export class DatabaseTaskStore implements TaskStore {
listStaleTasks(options: { timeoutS: number }): Promise<{
tasks: {
taskId: string;
recovery?: TaskRecovery;
}[];
}>;
// (undocumented)
recoverTasks(options: TaskStoreRecoverTaskOptions): Promise<string[]>;
// (undocumented)
shutdownTask(options: TaskStoreShutDownTaskOptions): Promise<void>;
}
@@ -433,8 +439,12 @@ export interface RouterOptions {
// (undocumented)
database: PluginDatabaseManager;
// (undocumented)
eventBroker?: EventBroker;
// (undocumented)
identity?: IdentityApi;
// (undocumented)
lifecycle?: LifecycleService;
// (undocumented)
logger: Logger;
// (undocumented)
permissionRules?: Array<
@@ -552,6 +562,8 @@ export interface TaskStore {
}[];
}>;
// (undocumented)
recoverTasks?(options: TaskStoreRecoverTaskOptions): Promise<string[]>;
// (undocumented)
shutdownTask?(options: TaskStoreShutDownTaskOptions): Promise<void>;
}
@@ -579,6 +591,11 @@ export type TaskStoreListEventsOptions = {
after?: number | undefined;
};
// @public
export type TaskStoreRecoverTaskOptions = {
timeoutS: HumanDuration;
};
// @public
export type TaskStoreShutDownTaskOptions = {
taskId: string;
@@ -591,6 +608,8 @@ export class TaskWorker {
// (undocumented)
protected onReadyToClaimTask(): Promise<void>;
// (undocumented)
recoverTasks(): Promise<void>;
// (undocumented)
runOneTask(task: TaskContext): Promise<void>;
// (undocumented)
start(): void;
+16
View File
@@ -40,6 +40,22 @@ export interface Config {
*/
concurrentTasksLimit?: number;
/**
* Sets the tasks recoverability on system start up.
*
* If not specified, the default value is false.
*/
EXPERIMENTAL_recoverTasks?: boolean;
/**
* Every task which is in progress state and having a last heartbeat longer than a specified timeout is going to
* be attempted to recover.
*
* If not specified, the default value is 5 seconds.
*
*/
EXPERIMENTAL_recoverTasksTimeout?: HumanDuration;
/**
* Makes sure to auto-expire and clean up things that time out or for other reasons should not be left lingering.
*
+1
View File
@@ -57,6 +57,7 @@
"@backstage/plugin-auth-node": "workspace:^",
"@backstage/plugin-catalog-backend-module-scaffolder-entity-model": "workspace:^",
"@backstage/plugin-catalog-node": "workspace:^",
"@backstage/plugin-events-node": "workspace:^",
"@backstage/plugin-permission-common": "workspace:^",
"@backstage/plugin-permission-node": "workspace:^",
"@backstage/plugin-scaffolder-backend-module-azure": "workspace:^",
@@ -75,6 +75,7 @@ export const scaffolderPlugin = createBackendPlugin({
deps: {
logger: coreServices.logger,
config: coreServices.rootConfig,
lifecycle: coreServices.rootLifecycle,
reader: coreServices.urlReader,
permissions: coreServices.permissions,
database: coreServices.database,
@@ -84,6 +85,7 @@ export const scaffolderPlugin = createBackendPlugin({
async init({
logger,
config,
lifecycle,
reader,
database,
httpRouter,
@@ -115,6 +117,7 @@ export const scaffolderPlugin = createBackendPlugin({
database,
catalogClient,
reader,
lifecycle,
actions,
taskBroker,
additionalTemplateFilters,
@@ -29,6 +29,7 @@ import {
TaskStoreCreateTaskOptions,
TaskStoreCreateTaskResult,
TaskStoreShutDownTaskOptions,
TaskStoreRecoverTaskOptions,
} from './types';
import {
SerializedTaskEvent,
@@ -36,7 +37,9 @@ import {
TaskStatus,
TaskEventType,
} from '@backstage/plugin-scaffolder-node';
import { DateTime } from 'luxon';
import { DateTime, Duration } from 'luxon';
import { TaskRecovery, TaskSpec } from '@backstage/plugin-scaffolder-common';
import { compactEvents } from './taskRecoveryHelper';
const migrationsDir = resolvePackagePath(
'@backstage/plugin-scaffolder-backend',
@@ -114,6 +117,20 @@ export class DatabaseTaskStore implements TaskStore {
return new DatabaseTaskStore(client);
}
private isRecoverableTask(spec: TaskSpec): boolean {
return ['startOver'].includes(
spec.EXPERIMENTAL_recovery?.EXPERIMENTAL_strategy ?? 'none',
);
}
private parseSpec({ spec, id }: { spec: string; id: string }): TaskSpec {
try {
return JSON.parse(spec);
} catch (error) {
throw new Error(`Failed to parse spec of task '${id}', ${error}`);
}
}
private static async getClient(
database: PluginDatabaseManager | Knex,
): Promise<Knex> {
@@ -223,34 +240,31 @@ export class DatabaseTaskStore implements TaskStore {
return undefined;
}
const spec = this.parseSpec(task);
const updateCount = await tx<RawDbTaskRow>('tasks')
.where({ id: task.id, status: 'open' })
.update({
status: 'processing',
last_heartbeat_at: this.db.fn.now(),
// remove the secrets when moving to processing state.
secrets: null,
// remove the secrets for non-recoverable tasks when moving to processing state.
secrets: this.isRecoverableTask(spec) ? task.secrets : null,
});
if (updateCount < 1) {
return undefined;
}
try {
const spec = JSON.parse(task.spec);
const secrets = task.secrets ? JSON.parse(task.secrets) : undefined;
return {
id: task.id,
spec,
status: 'processing',
lastHeartbeatAt: task.last_heartbeat_at,
createdAt: task.created_at,
createdBy: task.created_by ?? undefined,
secrets,
};
} catch (error) {
throw new Error(`Failed to parse spec of task '${task.id}', ${error}`);
}
const secrets = task.secrets ? JSON.parse(task.secrets) : undefined;
return {
id: task.id,
spec,
status: 'processing',
lastHeartbeatAt: task.last_heartbeat_at,
createdAt: task.created_at,
createdBy: task.created_by ?? undefined,
secrets,
} as SerializedTask;
});
}
@@ -266,7 +280,7 @@ export class DatabaseTaskStore implements TaskStore {
}
async listStaleTasks(options: { timeoutS: number }): Promise<{
tasks: { taskId: string }[];
tasks: { taskId: string; recovery?: TaskRecovery }[];
}> {
const { timeoutS } = options;
let heartbeatInterval = this.db.raw(`? - interval '${timeoutS} seconds'`, [
@@ -285,6 +299,7 @@ export class DatabaseTaskStore implements TaskStore {
.where('status', 'processing')
.andWhere('last_heartbeat_at', '<=', heartbeatInterval);
const tasks = rawRows.map(row => ({
recovery: (JSON.parse(row.spec) as TaskSpec).EXPERIMENTAL_recovery,
taskId: row.id,
}));
return { tasks };
@@ -297,7 +312,7 @@ export class DatabaseTaskStore implements TaskStore {
}): Promise<void> {
const { taskId, status, eventBody } = options;
let oldStatus: string;
let oldStatus: TaskStatus;
if (['failed', 'completed', 'cancelled'].includes(status)) {
oldStatus = 'processing';
} else {
@@ -322,6 +337,7 @@ export class DatabaseTaskStore implements TaskStore {
.where(criteria)
.update({
status,
secrets: null,
});
if (updateCount !== 1) {
@@ -409,7 +425,8 @@ export class DatabaseTaskStore implements TaskStore {
);
}
});
return { events };
return compactEvents(events);
}
async shutdownTask(options: TaskStoreShutDownTaskOptions): Promise<void> {
@@ -462,4 +479,52 @@ export class DatabaseTaskStore implements TaskStore {
body: serializedBody,
});
}
async recoverTasks(options: TaskStoreRecoverTaskOptions): Promise<string[]> {
const taskIdsToRecover: string[] = [];
const timeoutS = Duration.fromObject(options.timeoutS).as('seconds');
await this.db.transaction(async tx => {
let heartbeatInterval = this.db.raw(
`? - interval '${timeoutS} seconds'`,
[this.db.fn.now()],
);
if (this.db.client.config.client.includes('mysql')) {
heartbeatInterval = this.db.raw(
`date_sub(now(), interval ${timeoutS} second)`,
);
} else if (this.db.client.config.client.includes('sqlite3')) {
heartbeatInterval = this.db.raw(`datetime('now', ?)`, [
`-${timeoutS} seconds`,
]);
}
const result = await tx<RawDbTaskRow>('tasks')
.where('status', 'processing')
.andWhere('last_heartbeat_at', '<=', heartbeatInterval)
.update(
{
status: 'open',
last_heartbeat_at: this.db.fn.now(),
},
['id', 'spec'],
);
taskIdsToRecover.push(...result.map(i => i.id));
for (const { id, spec } of result) {
const taskSpec = JSON.parse(spec as string) as TaskSpec;
await this.db<RawDbTaskEventRow>('task_events').insert({
task_id: id,
event_type: 'recovered',
body: JSON.stringify({
recoverStrategy:
taskSpec.EXPERIMENTAL_recovery?.EXPERIMENTAL_strategy ?? 'none',
}),
});
}
});
return taskIdsToRecover;
}
}
@@ -55,6 +55,7 @@ import {
} from '@backstage/plugin-permission-common';
import { scaffolderActionRules } from '../../service/rules';
import { actionExecutePermission } from '@backstage/plugin-scaffolder-common/alpha';
import { TaskRecovery } from '@backstage/plugin-scaffolder-common';
type NunjucksWorkflowRunnerOptions = {
workingDirectory: string;
@@ -68,6 +69,7 @@ type NunjucksWorkflowRunnerOptions = {
type TemplateContext = {
parameters: JsonObject;
EXPERIMENTAL_recovery?: TaskRecovery;
steps: {
[stepName: string]: { output: { [outputName: string]: JsonValue } };
};
@@ -119,6 +121,7 @@ const isActionAuthorized = createConditionAuthorizer(
export class NunjucksWorkflowRunner implements WorkflowRunner {
private readonly defaultTemplateFilters: Record<string, TemplateFilter>;
constructor(private readonly options: NunjucksWorkflowRunnerOptions) {
this.defaultTemplateFilters = createDefaultFilters({
integrations: this.options.integrations,
@@ -415,6 +418,7 @@ export class NunjucksWorkflowRunner implements WorkflowRunner {
const context: TemplateContext = {
parameters: task.spec.parameters,
EXPERIMENTAL_recovery: task.spec.EXPERIMENTAL_recovery,
steps: {},
user: task.spec.user,
};
@@ -47,10 +47,16 @@ describe('StorageTaskBroker', () => {
storage = await createStore();
});
const emptyTaskSpec = { spec: { steps: [] } as unknown as TaskSpec };
const emptyTaskWithFakeSecretsSpec = {
spec: { steps: [] } as unknown as TaskSpec,
secrets: fakeSecrets,
};
const logger = getVoidLogger();
it('should claim a dispatched work item', async () => {
const broker = new StorageTaskBroker(storage, logger);
await broker.dispatch({ spec: {} as TaskSpec });
await broker.dispatch(emptyTaskSpec);
await expect(broker.claim()).resolves.toEqual(
expect.any(TaskManager as any),
);
@@ -62,7 +68,7 @@ describe('StorageTaskBroker', () => {
await expect(Promise.race([promise, 'waiting'])).resolves.toBe('waiting');
await broker.dispatch({ spec: {} as TaskSpec });
await broker.dispatch(emptyTaskSpec);
await expect(promise).resolves.toEqual(expect.any(TaskManager as any));
});
@@ -85,14 +91,14 @@ describe('StorageTaskBroker', () => {
it('should store secrets', async () => {
const broker = new StorageTaskBroker(storage, logger);
await broker.dispatch({ spec: {} as TaskSpec, secrets: fakeSecrets });
await broker.dispatch(emptyTaskWithFakeSecretsSpec);
const task = await broker.claim();
expect(task.secrets).toEqual(fakeSecrets);
}, 10000);
it('should complete a task', async () => {
const broker = new StorageTaskBroker(storage, logger);
const dispatchResult = await broker.dispatch({ spec: {} as TaskSpec });
const dispatchResult = await broker.dispatch(emptyTaskSpec);
const task = await broker.claim();
await task.complete('completed');
const taskRow = await storage.getTask(dispatchResult.taskId);
@@ -101,10 +107,7 @@ describe('StorageTaskBroker', () => {
it('should remove secrets after picking up a task', async () => {
const broker = new StorageTaskBroker(storage, logger);
const dispatchResult = await broker.dispatch({
spec: {} as TaskSpec,
secrets: fakeSecrets,
});
const dispatchResult = await broker.dispatch(emptyTaskWithFakeSecretsSpec);
await broker.claim();
const taskRow = await storage.getTask(dispatchResult.taskId);
@@ -113,7 +116,7 @@ describe('StorageTaskBroker', () => {
it('should fail a task', async () => {
const broker = new StorageTaskBroker(storage, logger);
const dispatchResult = await broker.dispatch({ spec: {} as TaskSpec });
const dispatchResult = await broker.dispatch(emptyTaskSpec);
const task = await broker.claim();
await task.complete('failed');
const taskRow = await storage.getTask(dispatchResult.taskId);
@@ -124,7 +127,7 @@ describe('StorageTaskBroker', () => {
const broker1 = new StorageTaskBroker(storage, logger);
const broker2 = new StorageTaskBroker(storage, logger);
const { taskId } = await broker1.dispatch({ spec: {} as TaskSpec });
const { taskId } = await broker1.dispatch(emptyTaskSpec);
const logPromise = new Promise<SerializedTaskEvent[]>(resolve => {
const observedEvents = new Array<SerializedTaskEvent>();
@@ -169,7 +172,7 @@ describe('StorageTaskBroker', () => {
it('should heartbeat', async () => {
const broker = new StorageTaskBroker(storage, logger);
const { taskId } = await broker.dispatch({ spec: {} as TaskSpec });
const { taskId } = await broker.dispatch(emptyTaskSpec);
const task = await broker.claim();
const initialTask = await storage.getTask(taskId);
@@ -187,7 +190,7 @@ describe('StorageTaskBroker', () => {
it('should be update the status to failed if heartbeat fails', async () => {
const broker = new StorageTaskBroker(storage, logger);
const { taskId } = await broker.dispatch({ spec: {} as TaskSpec });
const { taskId } = await broker.dispatch(emptyTaskSpec);
const task = await broker.claim();
jest
@@ -213,7 +216,7 @@ describe('StorageTaskBroker', () => {
it('should list all tasks', async () => {
const broker = new StorageTaskBroker(storage, logger);
const { taskId } = await broker.dispatch({ spec: {} as TaskSpec });
const { taskId } = await broker.dispatch(emptyTaskSpec);
const promise = broker.list();
await expect(promise).resolves.toEqual({
@@ -228,7 +231,7 @@ describe('StorageTaskBroker', () => {
it('should list only tasks createdBy a specific user', async () => {
const broker = new StorageTaskBroker(storage, logger);
const { taskId } = await broker.dispatch({
spec: {} as TaskSpec,
spec: { steps: [] } as unknown as TaskSpec,
createdBy: 'user:default/foo',
});
@@ -14,6 +14,7 @@
* limitations under the License.
*/
import { Config } from '@backstage/config';
import { TaskSpec } from '@backstage/plugin-scaffolder-common';
import { TaskSecrets } from '@backstage/plugin-scaffolder-node';
import { JsonObject, Observable } from '@backstage/types';
@@ -28,6 +29,7 @@ import {
TaskContext,
TaskStore,
} from './types';
import { readDuration } from './helper';
/**
* TaskManager
@@ -160,6 +162,7 @@ export class StorageTaskBroker implements TaskBroker {
constructor(
private readonly storage: TaskStore,
private readonly logger: Logger,
private readonly config?: Config,
) {}
async list(options?: {
@@ -202,6 +205,32 @@ export class StorageTaskBroker implements TaskBroker {
});
}
public async recoverTasks(): Promise<boolean> {
const enabled =
(this.config &&
this.config.getOptionalBoolean(
'scaffolder.EXPERIMENTAL_recoverTasks',
)) ??
false;
if (enabled) {
const recoveredTaskIds =
(await this.storage.recoverTasks?.({
timeoutS: readDuration(
this.config,
'scaffolder.EXPERIMENTAL_recoverTasksTimeout',
{
seconds: 30,
},
),
})) ?? [];
recoveredTaskIds.forEach(() => {
this.signalDispatch();
});
}
return enabled;
}
/**
* {@inheritdoc TaskBroker.claim}
*/
@@ -23,6 +23,7 @@ import { ScmIntegrations } from '@backstage/integration';
import { assertError } from '@backstage/errors';
import { TemplateFilter, TemplateGlobal } from '../../lib';
import { PermissionEvaluator } from '@backstage/plugin-permission-common';
/**
* TaskWorkerOptions
*
@@ -111,7 +112,21 @@ export class TaskWorker {
});
}
async recoverTasks() {
try {
await this.options.taskBroker.recoverTasks?.();
} catch (_err) {
// ignore
}
}
start() {
(async () => {
for (;;) {
await new Promise(resolve => setTimeout(resolve, 10000));
await this.recoverTasks();
}
})();
(async () => {
for (;;) {
await this.onReadyToClaimTask();
@@ -14,6 +14,9 @@
* limitations under the License.
*/
import { Config, readDurationFromConfig } from '@backstage/config';
import { HumanDuration } from '@backstage/types';
import { isArray } from 'lodash';
import { Schema } from 'jsonschema';
@@ -53,3 +56,14 @@ export function generateExampleOutput(schema: Schema): unknown {
}
return '<unknown>';
}
export const readDuration = (
config: Config | undefined,
key: string,
defaultValue: HumanDuration,
) => {
if (config?.has(key)) {
return readDurationFromConfig(config, { key });
}
return defaultValue;
};
@@ -35,5 +35,6 @@ export type {
TaskBrokerDispatchResult,
TaskBrokerDispatchOptions,
TaskStoreCreateTaskOptions,
TaskStoreRecoverTaskOptions,
TaskStoreCreateTaskResult,
} from './types';
@@ -0,0 +1,47 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { compactEvents } from './taskRecoveryHelper';
import { SerializedTaskEvent } from './types';
const toLogEvent = (stepId: string) =>
({
type: 'log',
body: { stepId },
} as unknown as SerializedTaskEvent);
const toRecoveredEvent = (recoverStrategy: string) =>
({
type: 'recovered',
body: { recoverStrategy },
} as unknown as SerializedTaskEvent);
describe('taskRecoveryHelper', () => {
describe('compactEvents', () => {
it('should return only events related to a restarted task. Recover strategy: "startOver"', () => {
const logEvents = [
'fetch',
'mock-step-1',
'mock-step-2',
'mock-step-3',
].map(toLogEvent);
const events = [...logEvents, toRecoveredEvent('startOver')];
expect(compactEvents(events)).toEqual({ events: [] });
});
});
});
@@ -0,0 +1,41 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { SerializedTaskEvent } from '@backstage/plugin-scaffolder-node';
import { TaskRecoverStrategy } from '@backstage/plugin-scaffolder-common';
export const compactEvents = (
events: SerializedTaskEvent[],
): { events: SerializedTaskEvent[] } => {
const recoveredEventInd = events
.slice()
.reverse()
.findIndex(event => event.type === 'recovered');
if (recoveredEventInd >= 0) {
const ind = events.length - recoveredEventInd - 1;
const { recoverStrategy } = events[ind].body as {
recoverStrategy: TaskRecoverStrategy;
};
if (recoverStrategy === 'startOver') {
return {
events: recoveredEventInd === 0 ? [] : events.slice(ind),
};
}
}
return { events };
};
@@ -14,7 +14,7 @@
* limitations under the License.
*/
import { JsonValue, JsonObject } from '@backstage/types';
import { JsonValue, JsonObject, HumanDuration } from '@backstage/types';
import { TaskSpec, TaskStep } from '@backstage/plugin-scaffolder-common';
import { TaskSecrets } from '@backstage/plugin-scaffolder-node';
import {
@@ -142,6 +142,14 @@ export type TaskStoreCreateTaskOptions = {
secrets?: TaskSecrets;
};
/**
* The options passed to {@link TaskStore.recoverTasks}
* @public
*/
export type TaskStoreRecoverTaskOptions = {
timeoutS: HumanDuration;
};
/**
* The response from {@link TaskStore.createTask}
* @public
@@ -162,6 +170,8 @@ export interface TaskStore {
options: TaskStoreCreateTaskOptions,
): Promise<TaskStoreCreateTaskResult>;
recoverTasks?(options: TaskStoreRecoverTaskOptions): Promise<string[]>;
getTask(taskId: string): Promise<SerializedTask>;
claimTask(): Promise<SerializedTask | undefined>;
@@ -85,6 +85,8 @@ const mockUrlReader = UrlReaders.default({
const getIdentity = jest.fn();
const config = new ConfigReader({});
describe('createRouter', () => {
let app: express.Express;
let loggerSpy: jest.SpyInstance;
@@ -181,7 +183,7 @@ describe('createRouter', () => {
const databaseTaskStore = await DatabaseTaskStore.create({
database: createDatabase(),
});
taskBroker = new StorageTaskBroker(databaseTaskStore, logger);
taskBroker = new StorageTaskBroker(databaseTaskStore, logger, config);
jest.spyOn(taskBroker, 'dispatch');
jest.spyOn(taskBroker, 'get');
@@ -787,7 +789,7 @@ data: {"id":1,"taskId":"a-random-id","type":"completion","createdAt":"","body":{
const databaseTaskStore = await DatabaseTaskStore.create({
database: createDatabase(),
});
taskBroker = new StorageTaskBroker(databaseTaskStore, logger);
taskBroker = new StorageTaskBroker(databaseTaskStore, logger, config);
jest.spyOn(taskBroker, 'dispatch');
jest.spyOn(taskBroker, 'get');
@@ -77,7 +77,9 @@ import {
PermissionRule,
} from '@backstage/plugin-permission-node';
import { scaffolderActionRules, scaffolderTemplateRules } from './rules';
import { EventBroker } from '@backstage/plugin-events-node';
import { Duration } from 'luxon';
import { LifecycleService } from '@backstage/backend-plugin-api';
/**
*
@@ -124,9 +126,11 @@ export interface RouterOptions {
logger: Logger;
config: Config;
reader: UrlReader;
lifecycle?: LifecycleService;
database: PluginDatabaseManager;
catalogClient: CatalogApi;
scheduler?: PluginTaskScheduler;
eventBroker?: EventBroker;
actions?: TemplateAction<any, any>[];
/**
* @deprecated taskWorkers is deprecated in favor of concurrentTasksLimit option with a single TaskWorker
@@ -266,7 +270,7 @@ export async function createRouter(
let taskBroker: TaskBroker;
if (!options.taskBroker) {
const databaseTaskStore = await DatabaseTaskStore.create({ database });
taskBroker = new StorageTaskBroker(databaseTaskStore, logger);
taskBroker = new StorageTaskBroker(databaseTaskStore, logger, config);
if (scheduler && databaseTaskStore.listStaleTasks) {
await scheduler.scheduleTask({
@@ -301,7 +305,7 @@ export async function createRouter(
const actionRegistry = new TemplateActionRegistry();
const workers = [];
const workers: TaskWorker[] = [];
if (concurrentTasksLimit !== 0) {
for (let i = 0; i < (taskWorkers || 1); i++) {
const worker = await TaskWorker.create({
@@ -331,7 +335,14 @@ export async function createRouter(
});
actionsToRegister.forEach(action => actionRegistry.register(action));
workers.forEach(worker => worker.start());
const launchWorkers = () => workers.forEach(worker => worker.start());
if (options.lifecycle) {
options.lifecycle.addStartupHook(launchWorkers);
} else {
launchWorkers();
}
const dryRunner = createDryRunner({
actionRegistry,
@@ -462,6 +473,7 @@ export async function createRouter(
id: step.id ?? `step-${index + 1}`,
name: step.name ?? step.action,
})),
EXPERIMENTAL_recovery: template.spec.EXPERIMENTAL_recovery,
output: template.spec.output ?? {},
parameters: values,
user: {
+15
View File
@@ -16,12 +16,21 @@ export const isTemplateEntityV1beta3: (
entity: Entity,
) => entity is TemplateEntityV1beta3;
// @public
export type TaskRecoverStrategy = 'none' | 'startOver';
// @public
export interface TaskRecovery {
EXPERIMENTAL_strategy?: TaskRecoverStrategy;
}
// @public
export type TaskSpec = TaskSpecV1beta3;
// @public
export interface TaskSpecV1beta3 {
apiVersion: 'scaffolder.backstage.io/v1beta3';
EXPERIMENTAL_recovery?: TaskRecovery;
output: {
[name: string]: JsonValue;
};
@@ -67,6 +76,7 @@ export interface TemplateEntityV1beta3 extends Entity {
spec: {
type: string;
presentation?: TemplatePresentationV1beta3;
EXPERIMENTAL_recovery?: TemplateRecoveryV1beta3;
parameters?: TemplateParametersV1beta3 | TemplateParametersV1beta3[];
steps: Array<TemplateEntityStepV1beta3>;
output?: {
@@ -108,4 +118,9 @@ export interface TemplatePresentationV1beta3 extends JsonObject {
reviewButtonText?: string;
};
}
// @public
export interface TemplateRecoveryV1beta3 extends JsonObject {
EXPERIMENTAL_strategy?: 'none' | 'startOver';
}
```
+28
View File
@@ -44,6 +44,30 @@ export type TemplateInfo = {
};
};
/**
*
* none - not recover, let the task be marked as failed
* startOver - do recover, start the execution of the task from the first step.
*
* @public
*/
export type TaskRecoverStrategy = 'none' | 'startOver';
/**
* When task didn't have a chance to complete due to system restart you can define the strategy what to do with such tasks,
* by defining a strategy.
*
* By default, it is none, what means to not recover but updating the status from 'processing' to 'failed'.
*
* @public
*/
export interface TaskRecovery {
/**
* Depends on how you designed your task you might tailor the behaviour for each of them.
*/
EXPERIMENTAL_strategy?: TaskRecoverStrategy;
}
/**
* An individual step of a scaffolder task, as stored in the database.
*
@@ -119,6 +143,10 @@ export interface TaskSpecV1beta3 {
*/
ref?: string;
};
/**
* How to recover the task after system restart or system crash.
*/
EXPERIMENTAL_recovery?: TaskRecovery;
}
/**
@@ -139,6 +139,16 @@
}
]
},
"EXPERIMENTAL_recovery": {
"type": "object",
"description": "A task recovery section.",
"properties": {
"EXPERIMENTAL_strategy": {
"type": "string",
"description": "Recovery strategy for your task (none or startOver). By default none"
}
}
},
"steps": {
"type": "array",
"description": "A list of steps to execute.",
@@ -51,6 +51,11 @@ export interface TemplateEntityV1beta3 extends Entity {
*/
presentation?: TemplatePresentationV1beta3;
/**
* Recovery strategy for the template
*/
EXPERIMENTAL_recovery?: TemplateRecoveryV1beta3;
/**
* This is a JSONSchema or an array of JSONSchema's which is used to render a form in the frontend
* to collect user input and validate it against that schema. This can then be used in the `steps` part below to template
@@ -73,6 +78,22 @@ export interface TemplateEntityV1beta3 extends Entity {
};
}
/**
* Depends on how you designed your task you might tailor the behaviour for each of them.
*
* @public
*/
export interface TemplateRecoveryV1beta3 extends JsonObject {
/**
*
* none - not recover, let the task be marked as failed
* startOver - do recover, start the execution of the task from the first step.
*
* @public
*/
EXPERIMENTAL_strategy?: 'none' | 'startOver';
}
/**
* The presentation of the template.
*
+1
View File
@@ -32,4 +32,5 @@ export type {
TemplateEntityStepV1beta3,
TemplateParametersV1beta3,
TemplatePermissionsV1beta3,
TemplateRecoveryV1beta3,
} from './TemplateEntityV1beta3';
+3 -1
View File
@@ -238,6 +238,8 @@ export interface TaskBroker {
tasks: SerializedTask[];
}>;
// (undocumented)
recoverTasks?(): Promise<boolean>;
// (undocumented)
vacuumTasks(options: { timeoutS: number }): Promise<void>;
}
@@ -279,7 +281,7 @@ export interface TaskContext {
}
// @public
export type TaskEventType = 'completion' | 'log' | 'cancelled';
export type TaskEventType = 'completion' | 'log' | 'cancelled' | 'recovered';
// @public
export type TaskSecrets = Record<string, string> & {
+3 -1
View File
@@ -65,7 +65,7 @@ export type SerializedTask = {
*
* @public
*/
export type TaskEventType = 'completion' | 'log' | 'cancelled';
export type TaskEventType = 'completion' | 'log' | 'cancelled' | 'recovered';
/**
* SerializedTaskEvent
@@ -131,6 +131,8 @@ export interface TaskBroker {
claim(): Promise<TaskContext>;
recoverTasks?(): Promise<boolean>;
dispatch(
options: TaskBrokerDispatchOptions,
): Promise<TaskBrokerDispatchResult>;
+1 -1
View File
@@ -152,7 +152,7 @@ export type ListActionsResponse = Array<Action>;
// @public
export type LogEvent = {
type: 'log' | 'completion' | 'cancelled';
type: 'log' | 'completion' | 'cancelled' | 'recovered';
body: {
message: string;
stepId?: string;
+1 -1
View File
@@ -105,7 +105,7 @@ export type ScaffolderTaskOutput = {
* @public
*/
export type LogEvent = {
type: 'log' | 'completion' | 'cancelled';
type: 'log' | 'completion' | 'cancelled' | 'recovered';
body: {
message: string;
stepId?: string;
@@ -62,12 +62,14 @@ type ReducerLogEntry = {
message: string;
output?: ScaffolderTaskOutput;
error?: Error;
recoverStrategy?: 'none' | 'startOver';
};
};
type ReducerAction =
| { type: 'INIT'; data: ScaffolderTask }
| { type: 'CANCELLED' }
| { type: 'RECOVERED'; data: ReducerLogEntry }
| { type: 'LOGS'; data: ReducerLogEntry[] }
| { type: 'COMPLETED'; data: ReducerLogEntry }
| { type: 'ERROR'; data: Error };
@@ -105,17 +107,19 @@ function reducer(draft: TaskStream, action: ReducerAction) {
const currentStepLog = draft.stepLogs?.[entry.body.stepId];
const currentStep = draft.steps?.[entry.body.stepId];
if (entry.body.status && entry.body.status !== currentStep.status) {
currentStep.status = entry.body.status;
if (currentStep) {
if (entry.body.status && entry.body.status !== currentStep.status) {
currentStep.status = entry.body.status;
if (currentStep.status === 'processing') {
currentStep.startedAt = entry.createdAt;
}
if (currentStep.status === 'processing') {
currentStep.startedAt = entry.createdAt;
}
if (
['cancelled', 'completed', 'failed'].includes(currentStep.status)
) {
currentStep.endedAt = entry.createdAt;
if (
['cancelled', 'completed', 'failed'].includes(currentStep.status)
) {
currentStep.endedAt = entry.createdAt;
}
}
}
@@ -138,6 +142,17 @@ function reducer(draft: TaskStream, action: ReducerAction) {
return;
}
case 'RECOVERED': {
for (const stepId in draft.steps) {
if (draft.steps.hasOwnProperty(stepId)) {
draft.steps[stepId].startedAt = undefined;
draft.steps[stepId].endedAt = undefined;
draft.steps[stepId].status = 'open';
}
}
return;
}
case 'ERROR': {
draft.error = action.data;
draft.loading = false;
@@ -202,6 +217,7 @@ export const useTaskEventStream = (taskId: string): TaskStream => {
subscription = observable.subscribe({
next: event => {
retryCount = 1;
switch (event.type) {
case 'log':
return collectedLogEvents.push(event);
@@ -212,6 +228,9 @@ export const useTaskEventStream = (taskId: string): TaskStream => {
emitLogs();
dispatch({ type: 'COMPLETED', data: event });
return undefined;
case 'recovered':
dispatch({ type: 'RECOVERED', data: event });
return undefined;
default:
throw new Error(
`Unhandled event type ${event.type} in observer`,
@@ -226,16 +245,18 @@ export const useTaskEventStream = (taskId: string): TaskStream => {
// just to restart the fetch process
// details here https://github.com/backstage/backstage/issues/15002
const maxRetries = 3;
if (!error.message) {
error.message = `We cannot connect at the moment, trying again in some seconds... Retrying (${retryCount}/3 retries)`;
error.message = `We cannot connect at the moment, trying again in some seconds... Retrying (${
retryCount > maxRetries ? maxRetries : retryCount
}/${maxRetries} retries)`;
}
if (retryCount <= 3) {
setTimeout(() => {
retryCount += 1;
startStreamLogProcess();
}, 15000);
}
setTimeout(() => {
retryCount += 1;
void startStreamLogProcess();
}, 15000);
dispatch({ type: 'ERROR', data: error });
},
@@ -247,7 +268,7 @@ export const useTaskEventStream = (taskId: string): TaskStream => {
}
},
);
startStreamLogProcess();
void startStreamLogProcess();
return () => {
didCancel = true;
if (subscription) {
+1
View File
@@ -252,6 +252,7 @@ export class ScaffolderClient implements ScaffolderApi {
: {},
});
eventSource.addEventListener('log', processEvent);
eventSource.addEventListener('recovered', processEvent);
eventSource.addEventListener('cancelled', processEvent);
eventSource.addEventListener('completion', (event: any) => {
processEvent(event);
+1
View File
@@ -8383,6 +8383,7 @@ __metadata:
"@backstage/plugin-auth-node": "workspace:^"
"@backstage/plugin-catalog-backend-module-scaffolder-entity-model": "workspace:^"
"@backstage/plugin-catalog-node": "workspace:^"
"@backstage/plugin-events-node": "workspace:^"
"@backstage/plugin-permission-common": "workspace:^"
"@backstage/plugin-permission-node": "workspace:^"
"@backstage/plugin-scaffolder-backend-module-azure": "workspace:^"