Recoverable tasks [version 1]
Signed-off-by: Bogdan Nechyporenko <bnechyporenko@bol.com> Signed-off-by: bnechyporenko <bnechyporenko@bol.com>
This commit is contained in:
@@ -0,0 +1,9 @@
|
||||
---
|
||||
'@backstage/plugin-scaffolder-backend': minor
|
||||
'@backstage/plugin-scaffolder-common': minor
|
||||
'@backstage/plugin-scaffolder-react': minor
|
||||
'@backstage/plugin-scaffolder-node': minor
|
||||
'@backstage/plugin-scaffolder': patch
|
||||
---
|
||||
|
||||
Introduced the first version of recoverable tasks.
|
||||
@@ -41,6 +41,23 @@ default, we highly recommend you to set that up. Follow our how-to guide
|
||||
[How to add documentation setup to your software templates](./how-to-guides.md#how-to-add-the-documentation-setup-to-your-software-templates)
|
||||
to get started.
|
||||
|
||||
### Use the documentation template
|
||||
|
||||
There could be _some_ situations where you don't want to keep your docs close to
|
||||
your code, but still want to publish documentation - for example, an onboarding
|
||||
tutorial. For this use case, we have put together a documentation template. Your
|
||||
Backstage instance should by default have a documentation template added. If
|
||||
not, copy the catalog locations from the
|
||||
[create-app template](https://github.com/backstage/backstage/blob/master/packages/create-app/templates/default-app/app-config.yaml.hbs)
|
||||
to add the documentation template. The template creates a component with
|
||||
**only** TechDocs configuration and default markdown files, and is otherwise
|
||||
empty.
|
||||
|
||||

|
||||
|
||||
Create an entity from the documentation template and you will get the needed
|
||||
setup for free.
|
||||
|
||||
### Enable documentation for an already existing entity
|
||||
|
||||
Prerequisites:
|
||||
|
||||
@@ -54,6 +54,7 @@ export default async function createPlugin(
|
||||
config: env.config,
|
||||
database: env.database,
|
||||
catalogClient: catalogClient,
|
||||
eventBroker: env.eventBroker,
|
||||
reader: env.reader,
|
||||
identity: env.identity,
|
||||
scheduler: env.scheduler,
|
||||
|
||||
@@ -9,6 +9,7 @@ import * as bitbucket from '@backstage/plugin-scaffolder-backend-module-bitbucke
|
||||
import { CatalogApi } from '@backstage/catalog-client';
|
||||
import { Config } from '@backstage/config';
|
||||
import { Duration } from 'luxon';
|
||||
import { EventBroker } from '@backstage/plugin-events-node';
|
||||
import { executeShellCommand as executeShellCommand_2 } from '@backstage/plugin-scaffolder-node';
|
||||
import { ExecuteShellCommandOptions } from '@backstage/plugin-scaffolder-node';
|
||||
import express from 'express';
|
||||
@@ -20,6 +21,7 @@ import { HumanDuration } from '@backstage/types';
|
||||
import { IdentityApi } from '@backstage/plugin-auth-node';
|
||||
import { JsonObject } from '@backstage/types';
|
||||
import { Knex } from 'knex';
|
||||
import { LifecycleService } from '@backstage/backend-plugin-api';
|
||||
import { Logger } from 'winston';
|
||||
import { PermissionEvaluator } from '@backstage/plugin-permission-common';
|
||||
import { PermissionRule } from '@backstage/plugin-permission-node';
|
||||
@@ -40,6 +42,7 @@ import { TaskBrokerDispatchResult as TaskBrokerDispatchResult_2 } from '@backsta
|
||||
import { TaskCompletionState as TaskCompletionState_2 } from '@backstage/plugin-scaffolder-node';
|
||||
import { TaskContext as TaskContext_2 } from '@backstage/plugin-scaffolder-node';
|
||||
import { TaskEventType as TaskEventType_2 } from '@backstage/plugin-scaffolder-node';
|
||||
import { TaskRecovery } from '@backstage/plugin-scaffolder-common';
|
||||
import { TaskSecrets as TaskSecrets_2 } from '@backstage/plugin-scaffolder-node';
|
||||
import { TaskSpec } from '@backstage/plugin-scaffolder-common';
|
||||
import { TaskSpecV1beta3 } from '@backstage/plugin-scaffolder-common';
|
||||
@@ -400,9 +403,12 @@ export class DatabaseTaskStore implements TaskStore {
|
||||
listStaleTasks(options: { timeoutS: number }): Promise<{
|
||||
tasks: {
|
||||
taskId: string;
|
||||
recovery?: TaskRecovery;
|
||||
}[];
|
||||
}>;
|
||||
// (undocumented)
|
||||
recoverTasks(options: TaskStoreRecoverTaskOptions): Promise<string[]>;
|
||||
// (undocumented)
|
||||
shutdownTask(options: TaskStoreShutDownTaskOptions): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -433,8 +439,12 @@ export interface RouterOptions {
|
||||
// (undocumented)
|
||||
database: PluginDatabaseManager;
|
||||
// (undocumented)
|
||||
eventBroker?: EventBroker;
|
||||
// (undocumented)
|
||||
identity?: IdentityApi;
|
||||
// (undocumented)
|
||||
lifecycle?: LifecycleService;
|
||||
// (undocumented)
|
||||
logger: Logger;
|
||||
// (undocumented)
|
||||
permissionRules?: Array<
|
||||
@@ -552,6 +562,8 @@ export interface TaskStore {
|
||||
}[];
|
||||
}>;
|
||||
// (undocumented)
|
||||
recoverTasks?(options: TaskStoreRecoverTaskOptions): Promise<string[]>;
|
||||
// (undocumented)
|
||||
shutdownTask?(options: TaskStoreShutDownTaskOptions): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -579,6 +591,11 @@ export type TaskStoreListEventsOptions = {
|
||||
after?: number | undefined;
|
||||
};
|
||||
|
||||
// @public
|
||||
export type TaskStoreRecoverTaskOptions = {
|
||||
timeoutS: HumanDuration;
|
||||
};
|
||||
|
||||
// @public
|
||||
export type TaskStoreShutDownTaskOptions = {
|
||||
taskId: string;
|
||||
@@ -591,6 +608,8 @@ export class TaskWorker {
|
||||
// (undocumented)
|
||||
protected onReadyToClaimTask(): Promise<void>;
|
||||
// (undocumented)
|
||||
recoverTasks(): Promise<void>;
|
||||
// (undocumented)
|
||||
runOneTask(task: TaskContext): Promise<void>;
|
||||
// (undocumented)
|
||||
start(): void;
|
||||
|
||||
+16
@@ -40,6 +40,22 @@ export interface Config {
|
||||
*/
|
||||
concurrentTasksLimit?: number;
|
||||
|
||||
/**
|
||||
* Sets the tasks recoverability on system start up.
|
||||
*
|
||||
* If not specified, the default value is false.
|
||||
*/
|
||||
EXPERIMENTAL_recoverTasks?: boolean;
|
||||
|
||||
/**
|
||||
* Every task which is in progress state and having a last heartbeat longer than a specified timeout is going to
|
||||
* be attempted to recover.
|
||||
*
|
||||
* If not specified, the default value is 5 seconds.
|
||||
*
|
||||
*/
|
||||
EXPERIMENTAL_recoverTasksTimeout?: HumanDuration;
|
||||
|
||||
/**
|
||||
* Makes sure to auto-expire and clean up things that time out or for other reasons should not be left lingering.
|
||||
*
|
||||
|
||||
@@ -57,6 +57,7 @@
|
||||
"@backstage/plugin-auth-node": "workspace:^",
|
||||
"@backstage/plugin-catalog-backend-module-scaffolder-entity-model": "workspace:^",
|
||||
"@backstage/plugin-catalog-node": "workspace:^",
|
||||
"@backstage/plugin-events-node": "workspace:^",
|
||||
"@backstage/plugin-permission-common": "workspace:^",
|
||||
"@backstage/plugin-permission-node": "workspace:^",
|
||||
"@backstage/plugin-scaffolder-backend-module-azure": "workspace:^",
|
||||
|
||||
@@ -75,6 +75,7 @@ export const scaffolderPlugin = createBackendPlugin({
|
||||
deps: {
|
||||
logger: coreServices.logger,
|
||||
config: coreServices.rootConfig,
|
||||
lifecycle: coreServices.rootLifecycle,
|
||||
reader: coreServices.urlReader,
|
||||
permissions: coreServices.permissions,
|
||||
database: coreServices.database,
|
||||
@@ -84,6 +85,7 @@ export const scaffolderPlugin = createBackendPlugin({
|
||||
async init({
|
||||
logger,
|
||||
config,
|
||||
lifecycle,
|
||||
reader,
|
||||
database,
|
||||
httpRouter,
|
||||
@@ -115,6 +117,7 @@ export const scaffolderPlugin = createBackendPlugin({
|
||||
database,
|
||||
catalogClient,
|
||||
reader,
|
||||
lifecycle,
|
||||
actions,
|
||||
taskBroker,
|
||||
additionalTemplateFilters,
|
||||
|
||||
@@ -29,6 +29,7 @@ import {
|
||||
TaskStoreCreateTaskOptions,
|
||||
TaskStoreCreateTaskResult,
|
||||
TaskStoreShutDownTaskOptions,
|
||||
TaskStoreRecoverTaskOptions,
|
||||
} from './types';
|
||||
import {
|
||||
SerializedTaskEvent,
|
||||
@@ -36,7 +37,9 @@ import {
|
||||
TaskStatus,
|
||||
TaskEventType,
|
||||
} from '@backstage/plugin-scaffolder-node';
|
||||
import { DateTime } from 'luxon';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
import { TaskRecovery, TaskSpec } from '@backstage/plugin-scaffolder-common';
|
||||
import { compactEvents } from './taskRecoveryHelper';
|
||||
|
||||
const migrationsDir = resolvePackagePath(
|
||||
'@backstage/plugin-scaffolder-backend',
|
||||
@@ -114,6 +117,20 @@ export class DatabaseTaskStore implements TaskStore {
|
||||
return new DatabaseTaskStore(client);
|
||||
}
|
||||
|
||||
private isRecoverableTask(spec: TaskSpec): boolean {
|
||||
return ['startOver'].includes(
|
||||
spec.EXPERIMENTAL_recovery?.EXPERIMENTAL_strategy ?? 'none',
|
||||
);
|
||||
}
|
||||
|
||||
private parseSpec({ spec, id }: { spec: string; id: string }): TaskSpec {
|
||||
try {
|
||||
return JSON.parse(spec);
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to parse spec of task '${id}', ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
private static async getClient(
|
||||
database: PluginDatabaseManager | Knex,
|
||||
): Promise<Knex> {
|
||||
@@ -223,34 +240,31 @@ export class DatabaseTaskStore implements TaskStore {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const spec = this.parseSpec(task);
|
||||
|
||||
const updateCount = await tx<RawDbTaskRow>('tasks')
|
||||
.where({ id: task.id, status: 'open' })
|
||||
.update({
|
||||
status: 'processing',
|
||||
last_heartbeat_at: this.db.fn.now(),
|
||||
// remove the secrets when moving to processing state.
|
||||
secrets: null,
|
||||
// remove the secrets for non-recoverable tasks when moving to processing state.
|
||||
secrets: this.isRecoverableTask(spec) ? task.secrets : null,
|
||||
});
|
||||
|
||||
if (updateCount < 1) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
const spec = JSON.parse(task.spec);
|
||||
const secrets = task.secrets ? JSON.parse(task.secrets) : undefined;
|
||||
return {
|
||||
id: task.id,
|
||||
spec,
|
||||
status: 'processing',
|
||||
lastHeartbeatAt: task.last_heartbeat_at,
|
||||
createdAt: task.created_at,
|
||||
createdBy: task.created_by ?? undefined,
|
||||
secrets,
|
||||
};
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to parse spec of task '${task.id}', ${error}`);
|
||||
}
|
||||
const secrets = task.secrets ? JSON.parse(task.secrets) : undefined;
|
||||
return {
|
||||
id: task.id,
|
||||
spec,
|
||||
status: 'processing',
|
||||
lastHeartbeatAt: task.last_heartbeat_at,
|
||||
createdAt: task.created_at,
|
||||
createdBy: task.created_by ?? undefined,
|
||||
secrets,
|
||||
} as SerializedTask;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -266,7 +280,7 @@ export class DatabaseTaskStore implements TaskStore {
|
||||
}
|
||||
|
||||
async listStaleTasks(options: { timeoutS: number }): Promise<{
|
||||
tasks: { taskId: string }[];
|
||||
tasks: { taskId: string; recovery?: TaskRecovery }[];
|
||||
}> {
|
||||
const { timeoutS } = options;
|
||||
let heartbeatInterval = this.db.raw(`? - interval '${timeoutS} seconds'`, [
|
||||
@@ -285,6 +299,7 @@ export class DatabaseTaskStore implements TaskStore {
|
||||
.where('status', 'processing')
|
||||
.andWhere('last_heartbeat_at', '<=', heartbeatInterval);
|
||||
const tasks = rawRows.map(row => ({
|
||||
recovery: (JSON.parse(row.spec) as TaskSpec).EXPERIMENTAL_recovery,
|
||||
taskId: row.id,
|
||||
}));
|
||||
return { tasks };
|
||||
@@ -297,7 +312,7 @@ export class DatabaseTaskStore implements TaskStore {
|
||||
}): Promise<void> {
|
||||
const { taskId, status, eventBody } = options;
|
||||
|
||||
let oldStatus: string;
|
||||
let oldStatus: TaskStatus;
|
||||
if (['failed', 'completed', 'cancelled'].includes(status)) {
|
||||
oldStatus = 'processing';
|
||||
} else {
|
||||
@@ -322,6 +337,7 @@ export class DatabaseTaskStore implements TaskStore {
|
||||
.where(criteria)
|
||||
.update({
|
||||
status,
|
||||
secrets: null,
|
||||
});
|
||||
|
||||
if (updateCount !== 1) {
|
||||
@@ -409,7 +425,8 @@ export class DatabaseTaskStore implements TaskStore {
|
||||
);
|
||||
}
|
||||
});
|
||||
return { events };
|
||||
|
||||
return compactEvents(events);
|
||||
}
|
||||
|
||||
async shutdownTask(options: TaskStoreShutDownTaskOptions): Promise<void> {
|
||||
@@ -462,4 +479,52 @@ export class DatabaseTaskStore implements TaskStore {
|
||||
body: serializedBody,
|
||||
});
|
||||
}
|
||||
|
||||
async recoverTasks(options: TaskStoreRecoverTaskOptions): Promise<string[]> {
|
||||
const taskIdsToRecover: string[] = [];
|
||||
const timeoutS = Duration.fromObject(options.timeoutS).as('seconds');
|
||||
|
||||
await this.db.transaction(async tx => {
|
||||
let heartbeatInterval = this.db.raw(
|
||||
`? - interval '${timeoutS} seconds'`,
|
||||
[this.db.fn.now()],
|
||||
);
|
||||
if (this.db.client.config.client.includes('mysql')) {
|
||||
heartbeatInterval = this.db.raw(
|
||||
`date_sub(now(), interval ${timeoutS} second)`,
|
||||
);
|
||||
} else if (this.db.client.config.client.includes('sqlite3')) {
|
||||
heartbeatInterval = this.db.raw(`datetime('now', ?)`, [
|
||||
`-${timeoutS} seconds`,
|
||||
]);
|
||||
}
|
||||
|
||||
const result = await tx<RawDbTaskRow>('tasks')
|
||||
.where('status', 'processing')
|
||||
.andWhere('last_heartbeat_at', '<=', heartbeatInterval)
|
||||
.update(
|
||||
{
|
||||
status: 'open',
|
||||
last_heartbeat_at: this.db.fn.now(),
|
||||
},
|
||||
['id', 'spec'],
|
||||
);
|
||||
|
||||
taskIdsToRecover.push(...result.map(i => i.id));
|
||||
|
||||
for (const { id, spec } of result) {
|
||||
const taskSpec = JSON.parse(spec as string) as TaskSpec;
|
||||
await this.db<RawDbTaskEventRow>('task_events').insert({
|
||||
task_id: id,
|
||||
event_type: 'recovered',
|
||||
body: JSON.stringify({
|
||||
recoverStrategy:
|
||||
taskSpec.EXPERIMENTAL_recovery?.EXPERIMENTAL_strategy ?? 'none',
|
||||
}),
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
return taskIdsToRecover;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ import {
|
||||
} from '@backstage/plugin-permission-common';
|
||||
import { scaffolderActionRules } from '../../service/rules';
|
||||
import { actionExecutePermission } from '@backstage/plugin-scaffolder-common/alpha';
|
||||
import { TaskRecovery } from '@backstage/plugin-scaffolder-common';
|
||||
|
||||
type NunjucksWorkflowRunnerOptions = {
|
||||
workingDirectory: string;
|
||||
@@ -68,6 +69,7 @@ type NunjucksWorkflowRunnerOptions = {
|
||||
|
||||
type TemplateContext = {
|
||||
parameters: JsonObject;
|
||||
EXPERIMENTAL_recovery?: TaskRecovery;
|
||||
steps: {
|
||||
[stepName: string]: { output: { [outputName: string]: JsonValue } };
|
||||
};
|
||||
@@ -119,6 +121,7 @@ const isActionAuthorized = createConditionAuthorizer(
|
||||
|
||||
export class NunjucksWorkflowRunner implements WorkflowRunner {
|
||||
private readonly defaultTemplateFilters: Record<string, TemplateFilter>;
|
||||
|
||||
constructor(private readonly options: NunjucksWorkflowRunnerOptions) {
|
||||
this.defaultTemplateFilters = createDefaultFilters({
|
||||
integrations: this.options.integrations,
|
||||
@@ -415,6 +418,7 @@ export class NunjucksWorkflowRunner implements WorkflowRunner {
|
||||
|
||||
const context: TemplateContext = {
|
||||
parameters: task.spec.parameters,
|
||||
EXPERIMENTAL_recovery: task.spec.EXPERIMENTAL_recovery,
|
||||
steps: {},
|
||||
user: task.spec.user,
|
||||
};
|
||||
|
||||
@@ -47,10 +47,16 @@ describe('StorageTaskBroker', () => {
|
||||
storage = await createStore();
|
||||
});
|
||||
|
||||
const emptyTaskSpec = { spec: { steps: [] } as unknown as TaskSpec };
|
||||
const emptyTaskWithFakeSecretsSpec = {
|
||||
spec: { steps: [] } as unknown as TaskSpec,
|
||||
secrets: fakeSecrets,
|
||||
};
|
||||
|
||||
const logger = getVoidLogger();
|
||||
it('should claim a dispatched work item', async () => {
|
||||
const broker = new StorageTaskBroker(storage, logger);
|
||||
await broker.dispatch({ spec: {} as TaskSpec });
|
||||
await broker.dispatch(emptyTaskSpec);
|
||||
await expect(broker.claim()).resolves.toEqual(
|
||||
expect.any(TaskManager as any),
|
||||
);
|
||||
@@ -62,7 +68,7 @@ describe('StorageTaskBroker', () => {
|
||||
|
||||
await expect(Promise.race([promise, 'waiting'])).resolves.toBe('waiting');
|
||||
|
||||
await broker.dispatch({ spec: {} as TaskSpec });
|
||||
await broker.dispatch(emptyTaskSpec);
|
||||
await expect(promise).resolves.toEqual(expect.any(TaskManager as any));
|
||||
});
|
||||
|
||||
@@ -85,14 +91,14 @@ describe('StorageTaskBroker', () => {
|
||||
|
||||
it('should store secrets', async () => {
|
||||
const broker = new StorageTaskBroker(storage, logger);
|
||||
await broker.dispatch({ spec: {} as TaskSpec, secrets: fakeSecrets });
|
||||
await broker.dispatch(emptyTaskWithFakeSecretsSpec);
|
||||
const task = await broker.claim();
|
||||
expect(task.secrets).toEqual(fakeSecrets);
|
||||
}, 10000);
|
||||
|
||||
it('should complete a task', async () => {
|
||||
const broker = new StorageTaskBroker(storage, logger);
|
||||
const dispatchResult = await broker.dispatch({ spec: {} as TaskSpec });
|
||||
const dispatchResult = await broker.dispatch(emptyTaskSpec);
|
||||
const task = await broker.claim();
|
||||
await task.complete('completed');
|
||||
const taskRow = await storage.getTask(dispatchResult.taskId);
|
||||
@@ -101,10 +107,7 @@ describe('StorageTaskBroker', () => {
|
||||
|
||||
it('should remove secrets after picking up a task', async () => {
|
||||
const broker = new StorageTaskBroker(storage, logger);
|
||||
const dispatchResult = await broker.dispatch({
|
||||
spec: {} as TaskSpec,
|
||||
secrets: fakeSecrets,
|
||||
});
|
||||
const dispatchResult = await broker.dispatch(emptyTaskWithFakeSecretsSpec);
|
||||
await broker.claim();
|
||||
|
||||
const taskRow = await storage.getTask(dispatchResult.taskId);
|
||||
@@ -113,7 +116,7 @@ describe('StorageTaskBroker', () => {
|
||||
|
||||
it('should fail a task', async () => {
|
||||
const broker = new StorageTaskBroker(storage, logger);
|
||||
const dispatchResult = await broker.dispatch({ spec: {} as TaskSpec });
|
||||
const dispatchResult = await broker.dispatch(emptyTaskSpec);
|
||||
const task = await broker.claim();
|
||||
await task.complete('failed');
|
||||
const taskRow = await storage.getTask(dispatchResult.taskId);
|
||||
@@ -124,7 +127,7 @@ describe('StorageTaskBroker', () => {
|
||||
const broker1 = new StorageTaskBroker(storage, logger);
|
||||
const broker2 = new StorageTaskBroker(storage, logger);
|
||||
|
||||
const { taskId } = await broker1.dispatch({ spec: {} as TaskSpec });
|
||||
const { taskId } = await broker1.dispatch(emptyTaskSpec);
|
||||
|
||||
const logPromise = new Promise<SerializedTaskEvent[]>(resolve => {
|
||||
const observedEvents = new Array<SerializedTaskEvent>();
|
||||
@@ -169,7 +172,7 @@ describe('StorageTaskBroker', () => {
|
||||
|
||||
it('should heartbeat', async () => {
|
||||
const broker = new StorageTaskBroker(storage, logger);
|
||||
const { taskId } = await broker.dispatch({ spec: {} as TaskSpec });
|
||||
const { taskId } = await broker.dispatch(emptyTaskSpec);
|
||||
const task = await broker.claim();
|
||||
|
||||
const initialTask = await storage.getTask(taskId);
|
||||
@@ -187,7 +190,7 @@ describe('StorageTaskBroker', () => {
|
||||
|
||||
it('should be update the status to failed if heartbeat fails', async () => {
|
||||
const broker = new StorageTaskBroker(storage, logger);
|
||||
const { taskId } = await broker.dispatch({ spec: {} as TaskSpec });
|
||||
const { taskId } = await broker.dispatch(emptyTaskSpec);
|
||||
const task = await broker.claim();
|
||||
|
||||
jest
|
||||
@@ -213,7 +216,7 @@ describe('StorageTaskBroker', () => {
|
||||
|
||||
it('should list all tasks', async () => {
|
||||
const broker = new StorageTaskBroker(storage, logger);
|
||||
const { taskId } = await broker.dispatch({ spec: {} as TaskSpec });
|
||||
const { taskId } = await broker.dispatch(emptyTaskSpec);
|
||||
|
||||
const promise = broker.list();
|
||||
await expect(promise).resolves.toEqual({
|
||||
@@ -228,7 +231,7 @@ describe('StorageTaskBroker', () => {
|
||||
it('should list only tasks createdBy a specific user', async () => {
|
||||
const broker = new StorageTaskBroker(storage, logger);
|
||||
const { taskId } = await broker.dispatch({
|
||||
spec: {} as TaskSpec,
|
||||
spec: { steps: [] } as unknown as TaskSpec,
|
||||
createdBy: 'user:default/foo',
|
||||
});
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Config } from '@backstage/config';
|
||||
import { TaskSpec } from '@backstage/plugin-scaffolder-common';
|
||||
import { TaskSecrets } from '@backstage/plugin-scaffolder-node';
|
||||
import { JsonObject, Observable } from '@backstage/types';
|
||||
@@ -28,6 +29,7 @@ import {
|
||||
TaskContext,
|
||||
TaskStore,
|
||||
} from './types';
|
||||
import { readDuration } from './helper';
|
||||
|
||||
/**
|
||||
* TaskManager
|
||||
@@ -160,6 +162,7 @@ export class StorageTaskBroker implements TaskBroker {
|
||||
constructor(
|
||||
private readonly storage: TaskStore,
|
||||
private readonly logger: Logger,
|
||||
private readonly config?: Config,
|
||||
) {}
|
||||
|
||||
async list(options?: {
|
||||
@@ -202,6 +205,32 @@ export class StorageTaskBroker implements TaskBroker {
|
||||
});
|
||||
}
|
||||
|
||||
public async recoverTasks(): Promise<boolean> {
|
||||
const enabled =
|
||||
(this.config &&
|
||||
this.config.getOptionalBoolean(
|
||||
'scaffolder.EXPERIMENTAL_recoverTasks',
|
||||
)) ??
|
||||
false;
|
||||
|
||||
if (enabled) {
|
||||
const recoveredTaskIds =
|
||||
(await this.storage.recoverTasks?.({
|
||||
timeoutS: readDuration(
|
||||
this.config,
|
||||
'scaffolder.EXPERIMENTAL_recoverTasksTimeout',
|
||||
{
|
||||
seconds: 30,
|
||||
},
|
||||
),
|
||||
})) ?? [];
|
||||
recoveredTaskIds.forEach(() => {
|
||||
this.signalDispatch();
|
||||
});
|
||||
}
|
||||
return enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc TaskBroker.claim}
|
||||
*/
|
||||
|
||||
@@ -23,6 +23,7 @@ import { ScmIntegrations } from '@backstage/integration';
|
||||
import { assertError } from '@backstage/errors';
|
||||
import { TemplateFilter, TemplateGlobal } from '../../lib';
|
||||
import { PermissionEvaluator } from '@backstage/plugin-permission-common';
|
||||
|
||||
/**
|
||||
* TaskWorkerOptions
|
||||
*
|
||||
@@ -111,7 +112,21 @@ export class TaskWorker {
|
||||
});
|
||||
}
|
||||
|
||||
async recoverTasks() {
|
||||
try {
|
||||
await this.options.taskBroker.recoverTasks?.();
|
||||
} catch (_err) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
start() {
|
||||
(async () => {
|
||||
for (;;) {
|
||||
await new Promise(resolve => setTimeout(resolve, 10000));
|
||||
await this.recoverTasks();
|
||||
}
|
||||
})();
|
||||
(async () => {
|
||||
for (;;) {
|
||||
await this.onReadyToClaimTask();
|
||||
|
||||
@@ -14,6 +14,9 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Config, readDurationFromConfig } from '@backstage/config';
|
||||
import { HumanDuration } from '@backstage/types';
|
||||
|
||||
import { isArray } from 'lodash';
|
||||
import { Schema } from 'jsonschema';
|
||||
|
||||
@@ -53,3 +56,14 @@ export function generateExampleOutput(schema: Schema): unknown {
|
||||
}
|
||||
return '<unknown>';
|
||||
}
|
||||
|
||||
export const readDuration = (
|
||||
config: Config | undefined,
|
||||
key: string,
|
||||
defaultValue: HumanDuration,
|
||||
) => {
|
||||
if (config?.has(key)) {
|
||||
return readDurationFromConfig(config, { key });
|
||||
}
|
||||
return defaultValue;
|
||||
};
|
||||
|
||||
@@ -35,5 +35,6 @@ export type {
|
||||
TaskBrokerDispatchResult,
|
||||
TaskBrokerDispatchOptions,
|
||||
TaskStoreCreateTaskOptions,
|
||||
TaskStoreRecoverTaskOptions,
|
||||
TaskStoreCreateTaskResult,
|
||||
} from './types';
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { compactEvents } from './taskRecoveryHelper';
|
||||
import { SerializedTaskEvent } from './types';
|
||||
|
||||
const toLogEvent = (stepId: string) =>
|
||||
({
|
||||
type: 'log',
|
||||
body: { stepId },
|
||||
} as unknown as SerializedTaskEvent);
|
||||
|
||||
const toRecoveredEvent = (recoverStrategy: string) =>
|
||||
({
|
||||
type: 'recovered',
|
||||
body: { recoverStrategy },
|
||||
} as unknown as SerializedTaskEvent);
|
||||
|
||||
describe('taskRecoveryHelper', () => {
|
||||
describe('compactEvents', () => {
|
||||
it('should return only events related to a restarted task. Recover strategy: "startOver"', () => {
|
||||
const logEvents = [
|
||||
'fetch',
|
||||
'mock-step-1',
|
||||
'mock-step-2',
|
||||
'mock-step-3',
|
||||
].map(toLogEvent);
|
||||
|
||||
const events = [...logEvents, toRecoveredEvent('startOver')];
|
||||
|
||||
expect(compactEvents(events)).toEqual({ events: [] });
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { SerializedTaskEvent } from '@backstage/plugin-scaffolder-node';
|
||||
import { TaskRecoverStrategy } from '@backstage/plugin-scaffolder-common';
|
||||
|
||||
export const compactEvents = (
|
||||
events: SerializedTaskEvent[],
|
||||
): { events: SerializedTaskEvent[] } => {
|
||||
const recoveredEventInd = events
|
||||
.slice()
|
||||
.reverse()
|
||||
.findIndex(event => event.type === 'recovered');
|
||||
|
||||
if (recoveredEventInd >= 0) {
|
||||
const ind = events.length - recoveredEventInd - 1;
|
||||
const { recoverStrategy } = events[ind].body as {
|
||||
recoverStrategy: TaskRecoverStrategy;
|
||||
};
|
||||
if (recoverStrategy === 'startOver') {
|
||||
return {
|
||||
events: recoveredEventInd === 0 ? [] : events.slice(ind),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return { events };
|
||||
};
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { JsonValue, JsonObject } from '@backstage/types';
|
||||
import { JsonValue, JsonObject, HumanDuration } from '@backstage/types';
|
||||
import { TaskSpec, TaskStep } from '@backstage/plugin-scaffolder-common';
|
||||
import { TaskSecrets } from '@backstage/plugin-scaffolder-node';
|
||||
import {
|
||||
@@ -142,6 +142,14 @@ export type TaskStoreCreateTaskOptions = {
|
||||
secrets?: TaskSecrets;
|
||||
};
|
||||
|
||||
/**
|
||||
* The options passed to {@link TaskStore.recoverTasks}
|
||||
* @public
|
||||
*/
|
||||
export type TaskStoreRecoverTaskOptions = {
|
||||
timeoutS: HumanDuration;
|
||||
};
|
||||
|
||||
/**
|
||||
* The response from {@link TaskStore.createTask}
|
||||
* @public
|
||||
@@ -162,6 +170,8 @@ export interface TaskStore {
|
||||
options: TaskStoreCreateTaskOptions,
|
||||
): Promise<TaskStoreCreateTaskResult>;
|
||||
|
||||
recoverTasks?(options: TaskStoreRecoverTaskOptions): Promise<string[]>;
|
||||
|
||||
getTask(taskId: string): Promise<SerializedTask>;
|
||||
|
||||
claimTask(): Promise<SerializedTask | undefined>;
|
||||
|
||||
@@ -85,6 +85,8 @@ const mockUrlReader = UrlReaders.default({
|
||||
|
||||
const getIdentity = jest.fn();
|
||||
|
||||
const config = new ConfigReader({});
|
||||
|
||||
describe('createRouter', () => {
|
||||
let app: express.Express;
|
||||
let loggerSpy: jest.SpyInstance;
|
||||
@@ -181,7 +183,7 @@ describe('createRouter', () => {
|
||||
const databaseTaskStore = await DatabaseTaskStore.create({
|
||||
database: createDatabase(),
|
||||
});
|
||||
taskBroker = new StorageTaskBroker(databaseTaskStore, logger);
|
||||
taskBroker = new StorageTaskBroker(databaseTaskStore, logger, config);
|
||||
|
||||
jest.spyOn(taskBroker, 'dispatch');
|
||||
jest.spyOn(taskBroker, 'get');
|
||||
@@ -787,7 +789,7 @@ data: {"id":1,"taskId":"a-random-id","type":"completion","createdAt":"","body":{
|
||||
const databaseTaskStore = await DatabaseTaskStore.create({
|
||||
database: createDatabase(),
|
||||
});
|
||||
taskBroker = new StorageTaskBroker(databaseTaskStore, logger);
|
||||
taskBroker = new StorageTaskBroker(databaseTaskStore, logger, config);
|
||||
|
||||
jest.spyOn(taskBroker, 'dispatch');
|
||||
jest.spyOn(taskBroker, 'get');
|
||||
|
||||
@@ -77,7 +77,9 @@ import {
|
||||
PermissionRule,
|
||||
} from '@backstage/plugin-permission-node';
|
||||
import { scaffolderActionRules, scaffolderTemplateRules } from './rules';
|
||||
import { EventBroker } from '@backstage/plugin-events-node';
|
||||
import { Duration } from 'luxon';
|
||||
import { LifecycleService } from '@backstage/backend-plugin-api';
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -124,9 +126,11 @@ export interface RouterOptions {
|
||||
logger: Logger;
|
||||
config: Config;
|
||||
reader: UrlReader;
|
||||
lifecycle?: LifecycleService;
|
||||
database: PluginDatabaseManager;
|
||||
catalogClient: CatalogApi;
|
||||
scheduler?: PluginTaskScheduler;
|
||||
eventBroker?: EventBroker;
|
||||
actions?: TemplateAction<any, any>[];
|
||||
/**
|
||||
* @deprecated taskWorkers is deprecated in favor of concurrentTasksLimit option with a single TaskWorker
|
||||
@@ -266,7 +270,7 @@ export async function createRouter(
|
||||
let taskBroker: TaskBroker;
|
||||
if (!options.taskBroker) {
|
||||
const databaseTaskStore = await DatabaseTaskStore.create({ database });
|
||||
taskBroker = new StorageTaskBroker(databaseTaskStore, logger);
|
||||
taskBroker = new StorageTaskBroker(databaseTaskStore, logger, config);
|
||||
|
||||
if (scheduler && databaseTaskStore.listStaleTasks) {
|
||||
await scheduler.scheduleTask({
|
||||
@@ -301,7 +305,7 @@ export async function createRouter(
|
||||
|
||||
const actionRegistry = new TemplateActionRegistry();
|
||||
|
||||
const workers = [];
|
||||
const workers: TaskWorker[] = [];
|
||||
if (concurrentTasksLimit !== 0) {
|
||||
for (let i = 0; i < (taskWorkers || 1); i++) {
|
||||
const worker = await TaskWorker.create({
|
||||
@@ -331,7 +335,14 @@ export async function createRouter(
|
||||
});
|
||||
|
||||
actionsToRegister.forEach(action => actionRegistry.register(action));
|
||||
workers.forEach(worker => worker.start());
|
||||
|
||||
const launchWorkers = () => workers.forEach(worker => worker.start());
|
||||
|
||||
if (options.lifecycle) {
|
||||
options.lifecycle.addStartupHook(launchWorkers);
|
||||
} else {
|
||||
launchWorkers();
|
||||
}
|
||||
|
||||
const dryRunner = createDryRunner({
|
||||
actionRegistry,
|
||||
@@ -462,6 +473,7 @@ export async function createRouter(
|
||||
id: step.id ?? `step-${index + 1}`,
|
||||
name: step.name ?? step.action,
|
||||
})),
|
||||
EXPERIMENTAL_recovery: template.spec.EXPERIMENTAL_recovery,
|
||||
output: template.spec.output ?? {},
|
||||
parameters: values,
|
||||
user: {
|
||||
|
||||
@@ -16,12 +16,21 @@ export const isTemplateEntityV1beta3: (
|
||||
entity: Entity,
|
||||
) => entity is TemplateEntityV1beta3;
|
||||
|
||||
// @public
|
||||
export type TaskRecoverStrategy = 'none' | 'startOver';
|
||||
|
||||
// @public
|
||||
export interface TaskRecovery {
|
||||
EXPERIMENTAL_strategy?: TaskRecoverStrategy;
|
||||
}
|
||||
|
||||
// @public
|
||||
export type TaskSpec = TaskSpecV1beta3;
|
||||
|
||||
// @public
|
||||
export interface TaskSpecV1beta3 {
|
||||
apiVersion: 'scaffolder.backstage.io/v1beta3';
|
||||
EXPERIMENTAL_recovery?: TaskRecovery;
|
||||
output: {
|
||||
[name: string]: JsonValue;
|
||||
};
|
||||
@@ -67,6 +76,7 @@ export interface TemplateEntityV1beta3 extends Entity {
|
||||
spec: {
|
||||
type: string;
|
||||
presentation?: TemplatePresentationV1beta3;
|
||||
EXPERIMENTAL_recovery?: TemplateRecoveryV1beta3;
|
||||
parameters?: TemplateParametersV1beta3 | TemplateParametersV1beta3[];
|
||||
steps: Array<TemplateEntityStepV1beta3>;
|
||||
output?: {
|
||||
@@ -108,4 +118,9 @@ export interface TemplatePresentationV1beta3 extends JsonObject {
|
||||
reviewButtonText?: string;
|
||||
};
|
||||
}
|
||||
|
||||
// @public
|
||||
export interface TemplateRecoveryV1beta3 extends JsonObject {
|
||||
EXPERIMENTAL_strategy?: 'none' | 'startOver';
|
||||
}
|
||||
```
|
||||
|
||||
@@ -44,6 +44,30 @@ export type TemplateInfo = {
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
* none - not recover, let the task be marked as failed
|
||||
* startOver - do recover, start the execution of the task from the first step.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export type TaskRecoverStrategy = 'none' | 'startOver';
|
||||
|
||||
/**
|
||||
* When task didn't have a chance to complete due to system restart you can define the strategy what to do with such tasks,
|
||||
* by defining a strategy.
|
||||
*
|
||||
* By default, it is none, what means to not recover but updating the status from 'processing' to 'failed'.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export interface TaskRecovery {
|
||||
/**
|
||||
* Depends on how you designed your task you might tailor the behaviour for each of them.
|
||||
*/
|
||||
EXPERIMENTAL_strategy?: TaskRecoverStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
* An individual step of a scaffolder task, as stored in the database.
|
||||
*
|
||||
@@ -119,6 +143,10 @@ export interface TaskSpecV1beta3 {
|
||||
*/
|
||||
ref?: string;
|
||||
};
|
||||
/**
|
||||
* How to recover the task after system restart or system crash.
|
||||
*/
|
||||
EXPERIMENTAL_recovery?: TaskRecovery;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -139,6 +139,16 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"EXPERIMENTAL_recovery": {
|
||||
"type": "object",
|
||||
"description": "A task recovery section.",
|
||||
"properties": {
|
||||
"EXPERIMENTAL_strategy": {
|
||||
"type": "string",
|
||||
"description": "Recovery strategy for your task (none or startOver). By default none"
|
||||
}
|
||||
}
|
||||
},
|
||||
"steps": {
|
||||
"type": "array",
|
||||
"description": "A list of steps to execute.",
|
||||
|
||||
@@ -51,6 +51,11 @@ export interface TemplateEntityV1beta3 extends Entity {
|
||||
*/
|
||||
presentation?: TemplatePresentationV1beta3;
|
||||
|
||||
/**
|
||||
* Recovery strategy for the template
|
||||
*/
|
||||
EXPERIMENTAL_recovery?: TemplateRecoveryV1beta3;
|
||||
|
||||
/**
|
||||
* This is a JSONSchema or an array of JSONSchema's which is used to render a form in the frontend
|
||||
* to collect user input and validate it against that schema. This can then be used in the `steps` part below to template
|
||||
@@ -73,6 +78,22 @@ export interface TemplateEntityV1beta3 extends Entity {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Depends on how you designed your task you might tailor the behaviour for each of them.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export interface TemplateRecoveryV1beta3 extends JsonObject {
|
||||
/**
|
||||
*
|
||||
* none - not recover, let the task be marked as failed
|
||||
* startOver - do recover, start the execution of the task from the first step.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
EXPERIMENTAL_strategy?: 'none' | 'startOver';
|
||||
}
|
||||
|
||||
/**
|
||||
* The presentation of the template.
|
||||
*
|
||||
|
||||
@@ -32,4 +32,5 @@ export type {
|
||||
TemplateEntityStepV1beta3,
|
||||
TemplateParametersV1beta3,
|
||||
TemplatePermissionsV1beta3,
|
||||
TemplateRecoveryV1beta3,
|
||||
} from './TemplateEntityV1beta3';
|
||||
|
||||
@@ -238,6 +238,8 @@ export interface TaskBroker {
|
||||
tasks: SerializedTask[];
|
||||
}>;
|
||||
// (undocumented)
|
||||
recoverTasks?(): Promise<boolean>;
|
||||
// (undocumented)
|
||||
vacuumTasks(options: { timeoutS: number }): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -279,7 +281,7 @@ export interface TaskContext {
|
||||
}
|
||||
|
||||
// @public
|
||||
export type TaskEventType = 'completion' | 'log' | 'cancelled';
|
||||
export type TaskEventType = 'completion' | 'log' | 'cancelled' | 'recovered';
|
||||
|
||||
// @public
|
||||
export type TaskSecrets = Record<string, string> & {
|
||||
|
||||
@@ -65,7 +65,7 @@ export type SerializedTask = {
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export type TaskEventType = 'completion' | 'log' | 'cancelled';
|
||||
export type TaskEventType = 'completion' | 'log' | 'cancelled' | 'recovered';
|
||||
|
||||
/**
|
||||
* SerializedTaskEvent
|
||||
@@ -131,6 +131,8 @@ export interface TaskBroker {
|
||||
|
||||
claim(): Promise<TaskContext>;
|
||||
|
||||
recoverTasks?(): Promise<boolean>;
|
||||
|
||||
dispatch(
|
||||
options: TaskBrokerDispatchOptions,
|
||||
): Promise<TaskBrokerDispatchResult>;
|
||||
|
||||
@@ -152,7 +152,7 @@ export type ListActionsResponse = Array<Action>;
|
||||
|
||||
// @public
|
||||
export type LogEvent = {
|
||||
type: 'log' | 'completion' | 'cancelled';
|
||||
type: 'log' | 'completion' | 'cancelled' | 'recovered';
|
||||
body: {
|
||||
message: string;
|
||||
stepId?: string;
|
||||
|
||||
@@ -105,7 +105,7 @@ export type ScaffolderTaskOutput = {
|
||||
* @public
|
||||
*/
|
||||
export type LogEvent = {
|
||||
type: 'log' | 'completion' | 'cancelled';
|
||||
type: 'log' | 'completion' | 'cancelled' | 'recovered';
|
||||
body: {
|
||||
message: string;
|
||||
stepId?: string;
|
||||
|
||||
@@ -62,12 +62,14 @@ type ReducerLogEntry = {
|
||||
message: string;
|
||||
output?: ScaffolderTaskOutput;
|
||||
error?: Error;
|
||||
recoverStrategy?: 'none' | 'startOver';
|
||||
};
|
||||
};
|
||||
|
||||
type ReducerAction =
|
||||
| { type: 'INIT'; data: ScaffolderTask }
|
||||
| { type: 'CANCELLED' }
|
||||
| { type: 'RECOVERED'; data: ReducerLogEntry }
|
||||
| { type: 'LOGS'; data: ReducerLogEntry[] }
|
||||
| { type: 'COMPLETED'; data: ReducerLogEntry }
|
||||
| { type: 'ERROR'; data: Error };
|
||||
@@ -105,17 +107,19 @@ function reducer(draft: TaskStream, action: ReducerAction) {
|
||||
const currentStepLog = draft.stepLogs?.[entry.body.stepId];
|
||||
const currentStep = draft.steps?.[entry.body.stepId];
|
||||
|
||||
if (entry.body.status && entry.body.status !== currentStep.status) {
|
||||
currentStep.status = entry.body.status;
|
||||
if (currentStep) {
|
||||
if (entry.body.status && entry.body.status !== currentStep.status) {
|
||||
currentStep.status = entry.body.status;
|
||||
|
||||
if (currentStep.status === 'processing') {
|
||||
currentStep.startedAt = entry.createdAt;
|
||||
}
|
||||
if (currentStep.status === 'processing') {
|
||||
currentStep.startedAt = entry.createdAt;
|
||||
}
|
||||
|
||||
if (
|
||||
['cancelled', 'completed', 'failed'].includes(currentStep.status)
|
||||
) {
|
||||
currentStep.endedAt = entry.createdAt;
|
||||
if (
|
||||
['cancelled', 'completed', 'failed'].includes(currentStep.status)
|
||||
) {
|
||||
currentStep.endedAt = entry.createdAt;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,6 +142,17 @@ function reducer(draft: TaskStream, action: ReducerAction) {
|
||||
return;
|
||||
}
|
||||
|
||||
case 'RECOVERED': {
|
||||
for (const stepId in draft.steps) {
|
||||
if (draft.steps.hasOwnProperty(stepId)) {
|
||||
draft.steps[stepId].startedAt = undefined;
|
||||
draft.steps[stepId].endedAt = undefined;
|
||||
draft.steps[stepId].status = 'open';
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
case 'ERROR': {
|
||||
draft.error = action.data;
|
||||
draft.loading = false;
|
||||
@@ -202,6 +217,7 @@ export const useTaskEventStream = (taskId: string): TaskStream => {
|
||||
|
||||
subscription = observable.subscribe({
|
||||
next: event => {
|
||||
retryCount = 1;
|
||||
switch (event.type) {
|
||||
case 'log':
|
||||
return collectedLogEvents.push(event);
|
||||
@@ -212,6 +228,9 @@ export const useTaskEventStream = (taskId: string): TaskStream => {
|
||||
emitLogs();
|
||||
dispatch({ type: 'COMPLETED', data: event });
|
||||
return undefined;
|
||||
case 'recovered':
|
||||
dispatch({ type: 'RECOVERED', data: event });
|
||||
return undefined;
|
||||
default:
|
||||
throw new Error(
|
||||
`Unhandled event type ${event.type} in observer`,
|
||||
@@ -226,16 +245,18 @@ export const useTaskEventStream = (taskId: string): TaskStream => {
|
||||
// just to restart the fetch process
|
||||
// details here https://github.com/backstage/backstage/issues/15002
|
||||
|
||||
const maxRetries = 3;
|
||||
|
||||
if (!error.message) {
|
||||
error.message = `We cannot connect at the moment, trying again in some seconds... Retrying (${retryCount}/3 retries)`;
|
||||
error.message = `We cannot connect at the moment, trying again in some seconds... Retrying (${
|
||||
retryCount > maxRetries ? maxRetries : retryCount
|
||||
}/${maxRetries} retries)`;
|
||||
}
|
||||
|
||||
if (retryCount <= 3) {
|
||||
setTimeout(() => {
|
||||
retryCount += 1;
|
||||
startStreamLogProcess();
|
||||
}, 15000);
|
||||
}
|
||||
setTimeout(() => {
|
||||
retryCount += 1;
|
||||
void startStreamLogProcess();
|
||||
}, 15000);
|
||||
|
||||
dispatch({ type: 'ERROR', data: error });
|
||||
},
|
||||
@@ -247,7 +268,7 @@ export const useTaskEventStream = (taskId: string): TaskStream => {
|
||||
}
|
||||
},
|
||||
);
|
||||
startStreamLogProcess();
|
||||
void startStreamLogProcess();
|
||||
return () => {
|
||||
didCancel = true;
|
||||
if (subscription) {
|
||||
|
||||
@@ -252,6 +252,7 @@ export class ScaffolderClient implements ScaffolderApi {
|
||||
: {},
|
||||
});
|
||||
eventSource.addEventListener('log', processEvent);
|
||||
eventSource.addEventListener('recovered', processEvent);
|
||||
eventSource.addEventListener('cancelled', processEvent);
|
||||
eventSource.addEventListener('completion', (event: any) => {
|
||||
processEvent(event);
|
||||
|
||||
@@ -8383,6 +8383,7 @@ __metadata:
|
||||
"@backstage/plugin-auth-node": "workspace:^"
|
||||
"@backstage/plugin-catalog-backend-module-scaffolder-entity-model": "workspace:^"
|
||||
"@backstage/plugin-catalog-node": "workspace:^"
|
||||
"@backstage/plugin-events-node": "workspace:^"
|
||||
"@backstage/plugin-permission-common": "workspace:^"
|
||||
"@backstage/plugin-permission-node": "workspace:^"
|
||||
"@backstage/plugin-scaffolder-backend-module-azure": "workspace:^"
|
||||
|
||||
Reference in New Issue
Block a user