Merge pull request #27815 from backstage/freben/scheduler3
add timestamp gauge metric for task run start/ends
This commit is contained in:
@@ -0,0 +1,6 @@
|
||||
---
|
||||
'@backstage/backend-defaults': patch
|
||||
'@backstage/plugin-catalog-backend-module-incremental-ingestion': patch
|
||||
---
|
||||
|
||||
Add task metrics as two gauges that track the last start and end timestamps as epoch seconds.
|
||||
@@ -24,7 +24,7 @@ import {
|
||||
SchedulerServiceTaskRunner,
|
||||
SchedulerServiceTaskScheduleDefinition,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { Counter, Histogram, metrics, trace } from '@opentelemetry/api';
|
||||
import { Counter, Histogram, Gauge, metrics, trace } from '@opentelemetry/api';
|
||||
import { Knex } from 'knex';
|
||||
import { Duration } from 'luxon';
|
||||
import { LocalTaskWorker } from './LocalTaskWorker';
|
||||
@@ -44,6 +44,8 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
|
||||
private readonly counter: Counter;
|
||||
private readonly duration: Histogram;
|
||||
private readonly lastStarted: Gauge;
|
||||
private readonly lastCompleted: Gauge;
|
||||
|
||||
constructor(
|
||||
private readonly databaseFactory: () => Promise<Knex>,
|
||||
@@ -58,6 +60,17 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
description: 'Histogram of task run durations',
|
||||
unit: 'seconds',
|
||||
});
|
||||
this.lastStarted = meter.createGauge('backend_tasks.task.runs.started', {
|
||||
description: 'Epoch timestamp seconds when the task was last started',
|
||||
unit: 'seconds',
|
||||
});
|
||||
this.lastCompleted = meter.createGauge(
|
||||
'backend_tasks.task.runs.completed',
|
||||
{
|
||||
description: 'Epoch timestamp seconds when the task was last completed',
|
||||
unit: 'seconds',
|
||||
},
|
||||
);
|
||||
this.shutdownInitiated = new Promise(shutdownInitiated => {
|
||||
rootLifecycle?.addShutdownHook(() => shutdownInitiated(true));
|
||||
});
|
||||
@@ -144,6 +157,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
scope,
|
||||
};
|
||||
this.counter.add(1, { ...labels, result: 'started' });
|
||||
this.lastStarted.record(Date.now() / 1000, { taskId: task.id });
|
||||
|
||||
const startTime = process.hrtime();
|
||||
|
||||
@@ -170,6 +184,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
const endTime = delta[0] + delta[1] / 1e9;
|
||||
this.counter.add(1, labels);
|
||||
this.duration.record(endTime, labels);
|
||||
this.lastCompleted.record(Date.now() / 1000, labels);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -61,6 +61,7 @@
|
||||
"@backstage/plugin-events-node": "workspace:^",
|
||||
"@backstage/plugin-permission-common": "workspace:^",
|
||||
"@backstage/types": "workspace:^",
|
||||
"@opentelemetry/api": "^1.3.0",
|
||||
"@types/express": "^4.17.6",
|
||||
"express": "^4.17.1",
|
||||
"express-promise-router": "^4.1.0",
|
||||
|
||||
+33
@@ -15,6 +15,7 @@
|
||||
*/
|
||||
|
||||
import type { DeferredEntity } from '@backstage/plugin-catalog-node';
|
||||
import { Gauge, metrics } from '@opentelemetry/api';
|
||||
import { IterationEngine, IterationEngineOptions } from '../types';
|
||||
import { IncrementalIngestionDatabaseManager } from '../database/IncrementalIngestionDatabaseManager';
|
||||
import { performance } from 'perf_hooks';
|
||||
@@ -27,10 +28,14 @@ import { HumanDuration } from '@backstage/types';
|
||||
export class IncrementalIngestionEngine implements IterationEngine {
|
||||
private readonly restLength: Duration;
|
||||
private readonly backoff: HumanDuration[];
|
||||
private readonly lastStarted: Gauge;
|
||||
private readonly lastCompleted: Gauge;
|
||||
|
||||
private manager: IncrementalIngestionDatabaseManager;
|
||||
|
||||
constructor(private options: IterationEngineOptions) {
|
||||
const meter = metrics.getMeter('default');
|
||||
|
||||
this.manager = options.manager;
|
||||
this.restLength = Duration.fromObject(options.restLength);
|
||||
this.backoff = options.backoff ?? [
|
||||
@@ -39,6 +44,23 @@ export class IncrementalIngestionEngine implements IterationEngine {
|
||||
{ minutes: 30 },
|
||||
{ hours: 3 },
|
||||
];
|
||||
|
||||
this.lastStarted = meter.createGauge(
|
||||
'catalog_incremental.ingestions.started',
|
||||
{
|
||||
description:
|
||||
'Epoch timestamp seconds when the ingestion was last started',
|
||||
unit: 'seconds',
|
||||
},
|
||||
);
|
||||
this.lastCompleted = meter.createGauge(
|
||||
'catalog_incremental.ingestions.completed',
|
||||
{
|
||||
description:
|
||||
'Epoch timestamp seconds when the ingestion was last completed',
|
||||
unit: 'seconds',
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
async taskFn(signal: AbortSignal) {
|
||||
@@ -70,6 +92,9 @@ export class IncrementalIngestionEngine implements IterationEngine {
|
||||
`incremental-engine: Ingestion ${ingestionId} rest period complete. Ingestion will start again`,
|
||||
);
|
||||
|
||||
this.lastStarted.record(Date.now() / 1000, {
|
||||
providerName: this.options.provider.getProviderName(),
|
||||
});
|
||||
await this.manager.setProviderComplete(ingestionId);
|
||||
} else {
|
||||
this.options.logger.debug(
|
||||
@@ -85,6 +110,10 @@ export class IncrementalIngestionEngine implements IterationEngine {
|
||||
this.options.logger.info(
|
||||
`incremental-engine: Ingestion '${ingestionId}' complete, transitioning to rest period of ${this.restLength.toHuman()}`,
|
||||
);
|
||||
this.lastCompleted.record(Date.now() / 1000, {
|
||||
providerName: this.options.provider.getProviderName(),
|
||||
status: 'completed',
|
||||
});
|
||||
await this.manager.setProviderResting(
|
||||
ingestionId,
|
||||
this.restLength,
|
||||
@@ -122,6 +151,10 @@ export class IncrementalIngestionEngine implements IterationEngine {
|
||||
this.options.logger.error(
|
||||
`incremental-engine: Ingestion '${ingestionId}' threw an error during ingestion burst. Ingestion will backoff for ${currentBackoff.toHuman()} (${truncatedError})`,
|
||||
);
|
||||
this.lastCompleted.record(Date.now() / 1000, {
|
||||
providerName: this.options.provider.getProviderName(),
|
||||
status: 'failed',
|
||||
});
|
||||
|
||||
await this.manager.setProviderBackoff(
|
||||
ingestionId,
|
||||
|
||||
Reference in New Issue
Block a user