feat(catalog-backend): Add observability for catalog processing
Signed-off-by: Mike Bryant <mike.bryant@mettle.co.uk> Signed-off-by: Mike Bryant <mike@mikebryant.me.uk>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-backend': patch
|
||||
---
|
||||
|
||||
Added OpenTelemetry spans for catalog processing
|
||||
@@ -23,7 +23,7 @@ import { assertError, serializeError, stringifyError } from '@backstage/errors';
|
||||
import { Hash } from 'crypto';
|
||||
import stableStringify from 'fast-json-stable-stringify';
|
||||
import { Logger } from 'winston';
|
||||
import { metrics } from '@opentelemetry/api';
|
||||
import { metrics, SpanStatusCode, trace } from '@opentelemetry/api';
|
||||
import { ProcessingDatabase, RefreshStateItem } from '../database/types';
|
||||
import { createCounterMetric, createSummaryMetric } from '../util/metrics';
|
||||
import {
|
||||
@@ -35,9 +35,12 @@ import { Stitcher } from '../stitching/Stitcher';
|
||||
import { startTaskPipeline } from './TaskPipeline';
|
||||
import { PluginTaskScheduler } from '@backstage/backend-tasks';
|
||||
import { Config } from '@backstage/config';
|
||||
import { addEntityAttributes, TRACER_ID } from '../util/opentelemetry';
|
||||
|
||||
const CACHE_TTL = 5;
|
||||
|
||||
const tracer = trace.getTracer(TRACER_ID);
|
||||
|
||||
export type ProgressTracker = ReturnType<typeof progressTracker>;
|
||||
|
||||
export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
|
||||
@@ -131,177 +134,186 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
|
||||
}
|
||||
},
|
||||
processTask: async item => {
|
||||
const track = this.tracker.processStart(item, this.logger);
|
||||
await tracer.startActiveSpan('ProcessingRun', async span => {
|
||||
const track = this.tracker.processStart(item, this.logger);
|
||||
addEntityAttributes(span, item.entityRef);
|
||||
|
||||
try {
|
||||
const {
|
||||
id,
|
||||
state,
|
||||
unprocessedEntity,
|
||||
entityRef,
|
||||
locationKey,
|
||||
resultHash: previousResultHash,
|
||||
} = item;
|
||||
const result = await this.orchestrator.process({
|
||||
entity: unprocessedEntity,
|
||||
state,
|
||||
});
|
||||
try {
|
||||
const {
|
||||
id,
|
||||
state,
|
||||
unprocessedEntity,
|
||||
entityRef,
|
||||
locationKey,
|
||||
resultHash: previousResultHash,
|
||||
} = item;
|
||||
const result = await this.orchestrator.process({
|
||||
entity: unprocessedEntity,
|
||||
state,
|
||||
});
|
||||
|
||||
track.markProcessorsCompleted(result);
|
||||
track.markProcessorsCompleted(result);
|
||||
|
||||
if (result.ok) {
|
||||
const { ttl: _, ...stateWithoutTtl } = state ?? {};
|
||||
if (
|
||||
stableStringify(stateWithoutTtl) !== stableStringify(result.state)
|
||||
) {
|
||||
if (result.ok) {
|
||||
const { ttl: _, ...stateWithoutTtl } = state ?? {};
|
||||
if (
|
||||
stableStringify(stateWithoutTtl) !== stableStringify(result.state)
|
||||
) {
|
||||
await this.processingDatabase.transaction(async tx => {
|
||||
await this.processingDatabase.updateEntityCache(tx, {
|
||||
id,
|
||||
state: {
|
||||
ttl: CACHE_TTL,
|
||||
...result.state,
|
||||
},
|
||||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
const maybeTtl = state?.ttl;
|
||||
const ttl = Number.isInteger(maybeTtl) ? (maybeTtl as number) : 0;
|
||||
await this.processingDatabase.transaction(async tx => {
|
||||
await this.processingDatabase.updateEntityCache(tx, {
|
||||
id,
|
||||
state: {
|
||||
ttl: CACHE_TTL,
|
||||
...result.state,
|
||||
},
|
||||
state: ttl > 0 ? { ...state, ttl: ttl - 1 } : {},
|
||||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
const maybeTtl = state?.ttl;
|
||||
const ttl = Number.isInteger(maybeTtl) ? (maybeTtl as number) : 0;
|
||||
await this.processingDatabase.transaction(async tx => {
|
||||
await this.processingDatabase.updateEntityCache(tx, {
|
||||
id,
|
||||
state: ttl > 0 ? { ...state, ttl: ttl - 1 } : {},
|
||||
|
||||
const location =
|
||||
unprocessedEntity?.metadata?.annotations?.[ANNOTATION_LOCATION];
|
||||
for (const error of result.errors) {
|
||||
this.logger.warn(error.message, {
|
||||
entity: entityRef,
|
||||
location,
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
const errorsString = JSON.stringify(
|
||||
result.errors.map(e => serializeError(e)),
|
||||
);
|
||||
|
||||
const location =
|
||||
unprocessedEntity?.metadata?.annotations?.[ANNOTATION_LOCATION];
|
||||
for (const error of result.errors) {
|
||||
this.logger.warn(error.message, {
|
||||
entity: entityRef,
|
||||
location,
|
||||
});
|
||||
}
|
||||
const errorsString = JSON.stringify(
|
||||
result.errors.map(e => serializeError(e)),
|
||||
);
|
||||
let hashBuilder = this.createHash().update(errorsString);
|
||||
|
||||
let hashBuilder = this.createHash().update(errorsString);
|
||||
|
||||
if (result.ok) {
|
||||
const { entityRefs: parents } =
|
||||
await this.processingDatabase.transaction(tx =>
|
||||
this.processingDatabase.listParents(tx, {
|
||||
entityRef,
|
||||
}),
|
||||
);
|
||||
|
||||
hashBuilder = hashBuilder
|
||||
.update(stableStringify({ ...result.completedEntity }))
|
||||
.update(stableStringify([...result.deferredEntities]))
|
||||
.update(stableStringify([...result.relations]))
|
||||
.update(stableStringify([...result.refreshKeys]))
|
||||
.update(stableStringify([...parents]));
|
||||
}
|
||||
|
||||
const resultHash = hashBuilder.digest('hex');
|
||||
if (resultHash === previousResultHash) {
|
||||
// If nothing changed in our produced outputs, we cannot have any
|
||||
// significant effect on our surroundings; therefore, we just abort
|
||||
// without any updates / stitching.
|
||||
track.markSuccessfulWithNoChanges();
|
||||
return;
|
||||
}
|
||||
|
||||
// If the result was marked as not OK, it signals that some part of the
|
||||
// processing pipeline threw an exception. This can happen both as part of
|
||||
// non-catastrophic things such as due to validation errors, as well as if
|
||||
// something fatal happens inside the processing for other reasons. In any
|
||||
// case, this means we can't trust that anything in the output is okay. So
|
||||
// just store the errors and trigger a stich so that they become visible to
|
||||
// the outside.
|
||||
if (!result.ok) {
|
||||
// notify the error listener if the entity can not be processed.
|
||||
Promise.resolve(undefined)
|
||||
.then(() =>
|
||||
this.onProcessingError?.({
|
||||
unprocessedEntity,
|
||||
errors: result.errors,
|
||||
}),
|
||||
)
|
||||
.catch(error => {
|
||||
this.logger.debug(
|
||||
`Processing error listener threw an exception, ${stringifyError(
|
||||
error,
|
||||
)}`,
|
||||
if (result.ok) {
|
||||
const { entityRefs: parents } =
|
||||
await this.processingDatabase.transaction(tx =>
|
||||
this.processingDatabase.listParents(tx, {
|
||||
entityRef,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
hashBuilder = hashBuilder
|
||||
.update(stableStringify({ ...result.completedEntity }))
|
||||
.update(stableStringify([...result.deferredEntities]))
|
||||
.update(stableStringify([...result.relations]))
|
||||
.update(stableStringify([...result.refreshKeys]))
|
||||
.update(stableStringify([...parents]));
|
||||
}
|
||||
|
||||
const resultHash = hashBuilder.digest('hex');
|
||||
if (resultHash === previousResultHash) {
|
||||
// If nothing changed in our produced outputs, we cannot have any
|
||||
// significant effect on our surroundings; therefore, we just abort
|
||||
// without any updates / stitching.
|
||||
track.markSuccessfulWithNoChanges();
|
||||
span.end();
|
||||
return;
|
||||
}
|
||||
|
||||
// If the result was marked as not OK, it signals that some part of the
|
||||
// processing pipeline threw an exception. This can happen both as part of
|
||||
// non-catastrophic things such as due to validation errors, as well as if
|
||||
// something fatal happens inside the processing for other reasons. In any
|
||||
// case, this means we can't trust that anything in the output is okay. So
|
||||
// just store the errors and trigger a stich so that they become visible to
|
||||
// the outside.
|
||||
if (!result.ok) {
|
||||
// notify the error listener if the entity can not be processed.
|
||||
Promise.resolve(undefined)
|
||||
.then(() =>
|
||||
this.onProcessingError?.({
|
||||
unprocessedEntity,
|
||||
errors: result.errors,
|
||||
}),
|
||||
)
|
||||
.catch(error => {
|
||||
this.logger.debug(
|
||||
`Processing error listener threw an exception, ${stringifyError(
|
||||
error,
|
||||
)}`,
|
||||
);
|
||||
});
|
||||
|
||||
await this.processingDatabase.transaction(async tx => {
|
||||
await this.processingDatabase.updateProcessedEntityErrors(tx, {
|
||||
id,
|
||||
errors: errorsString,
|
||||
resultHash,
|
||||
});
|
||||
});
|
||||
await this.stitcher.stitch(
|
||||
new Set([stringifyEntityRef(unprocessedEntity)]),
|
||||
);
|
||||
track.markSuccessfulWithErrors();
|
||||
span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
span.end();
|
||||
return;
|
||||
}
|
||||
|
||||
result.completedEntity.metadata.uid = id;
|
||||
let oldRelationSources: Map<string, string>;
|
||||
await this.processingDatabase.transaction(async tx => {
|
||||
await this.processingDatabase.updateProcessedEntityErrors(tx, {
|
||||
id,
|
||||
errors: errorsString,
|
||||
resultHash,
|
||||
});
|
||||
const { previous } =
|
||||
await this.processingDatabase.updateProcessedEntity(tx, {
|
||||
id,
|
||||
processedEntity: result.completedEntity,
|
||||
resultHash,
|
||||
errors: errorsString,
|
||||
relations: result.relations,
|
||||
deferredEntities: result.deferredEntities,
|
||||
locationKey,
|
||||
refreshKeys: result.refreshKeys,
|
||||
});
|
||||
oldRelationSources = new Map(
|
||||
previous.relations.map(r => [
|
||||
`${r.source_entity_ref}:${r.type}`,
|
||||
r.source_entity_ref,
|
||||
]),
|
||||
);
|
||||
});
|
||||
await this.stitcher.stitch(
|
||||
new Set([stringifyEntityRef(unprocessedEntity)]),
|
||||
|
||||
const newRelationSources = new Map<string, string>(
|
||||
result.relations.map(relation => {
|
||||
const sourceEntityRef = stringifyEntityRef(relation.source);
|
||||
return [`${sourceEntityRef}:${relation.type}`, sourceEntityRef];
|
||||
}),
|
||||
);
|
||||
track.markSuccessfulWithErrors();
|
||||
return;
|
||||
|
||||
const setOfThingsToStitch = new Set<string>([
|
||||
stringifyEntityRef(result.completedEntity),
|
||||
]);
|
||||
newRelationSources.forEach((sourceEntityRef, uniqueKey) => {
|
||||
if (!oldRelationSources.has(uniqueKey)) {
|
||||
setOfThingsToStitch.add(sourceEntityRef);
|
||||
}
|
||||
});
|
||||
oldRelationSources!.forEach((sourceEntityRef, uniqueKey) => {
|
||||
if (!newRelationSources.has(uniqueKey)) {
|
||||
setOfThingsToStitch.add(sourceEntityRef);
|
||||
}
|
||||
});
|
||||
|
||||
await this.stitcher.stitch(setOfThingsToStitch);
|
||||
|
||||
track.markSuccessfulWithChanges(setOfThingsToStitch.size);
|
||||
} catch (error) {
|
||||
assertError(error);
|
||||
track.markFailed(error);
|
||||
span.recordException(error);
|
||||
span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
}
|
||||
|
||||
result.completedEntity.metadata.uid = id;
|
||||
let oldRelationSources: Map<string, string>;
|
||||
await this.processingDatabase.transaction(async tx => {
|
||||
const { previous } =
|
||||
await this.processingDatabase.updateProcessedEntity(tx, {
|
||||
id,
|
||||
processedEntity: result.completedEntity,
|
||||
resultHash,
|
||||
errors: errorsString,
|
||||
relations: result.relations,
|
||||
deferredEntities: result.deferredEntities,
|
||||
locationKey,
|
||||
refreshKeys: result.refreshKeys,
|
||||
});
|
||||
oldRelationSources = new Map(
|
||||
previous.relations.map(r => [
|
||||
`${r.source_entity_ref}:${r.type}`,
|
||||
r.source_entity_ref,
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
const newRelationSources = new Map<string, string>(
|
||||
result.relations.map(relation => {
|
||||
const sourceEntityRef = stringifyEntityRef(relation.source);
|
||||
return [`${sourceEntityRef}:${relation.type}`, sourceEntityRef];
|
||||
}),
|
||||
);
|
||||
|
||||
const setOfThingsToStitch = new Set<string>([
|
||||
stringifyEntityRef(result.completedEntity),
|
||||
]);
|
||||
newRelationSources.forEach((sourceEntityRef, uniqueKey) => {
|
||||
if (!oldRelationSources.has(uniqueKey)) {
|
||||
setOfThingsToStitch.add(sourceEntityRef);
|
||||
}
|
||||
});
|
||||
oldRelationSources!.forEach((sourceEntityRef, uniqueKey) => {
|
||||
if (!newRelationSources.has(uniqueKey)) {
|
||||
setOfThingsToStitch.add(sourceEntityRef);
|
||||
}
|
||||
});
|
||||
|
||||
await this.stitcher.stitch(setOfThingsToStitch);
|
||||
|
||||
track.markSuccessfulWithChanges(setOfThingsToStitch.size);
|
||||
} catch (error) {
|
||||
assertError(error);
|
||||
track.markFailed(error);
|
||||
}
|
||||
span.end();
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Span, SpanStatusCode, trace } from '@opentelemetry/api';
|
||||
import {
|
||||
Entity,
|
||||
EntityPolicy,
|
||||
@@ -55,6 +56,9 @@ import {
|
||||
} from './util';
|
||||
import { CatalogRulesEnforcer } from '../ingestion/CatalogRules';
|
||||
import { ProcessorCacheManager } from './ProcessorCacheManager';
|
||||
import { addEntityAttributes, TRACER_ID } from '../util/opentelemetry';
|
||||
|
||||
const tracer = trace.getTracer(TRACER_ID);
|
||||
|
||||
type Context = {
|
||||
entityRef: string;
|
||||
@@ -64,6 +68,18 @@ type Context = {
|
||||
cache: ProcessorCacheManager;
|
||||
};
|
||||
|
||||
function addProcessorAttributes(
|
||||
span: Span,
|
||||
stage: string,
|
||||
processor: CatalogProcessor,
|
||||
) {
|
||||
span.setAttribute('backstage.catalog.processor.stage', stage);
|
||||
span.setAttribute(
|
||||
'backstage.catalog.processor.name',
|
||||
processor.getProcessorName(),
|
||||
);
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export class DefaultCatalogProcessingOrchestrator
|
||||
implements CatalogProcessingOrchestrator
|
||||
@@ -183,20 +199,30 @@ export class DefaultCatalogProcessingOrchestrator
|
||||
|
||||
for (const processor of this.options.processors) {
|
||||
if (processor.preProcessEntity) {
|
||||
try {
|
||||
res = await processor.preProcessEntity(
|
||||
res,
|
||||
context.location,
|
||||
context.collector.forProcessor(processor),
|
||||
context.originLocation,
|
||||
context.cache.forProcessor(processor),
|
||||
);
|
||||
} catch (e) {
|
||||
throw new InputError(
|
||||
`Processor ${processor.constructor.name} threw an error while preprocessing`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
let innerRes = res;
|
||||
res = await tracer.startActiveSpan('ProcessingStep', async span => {
|
||||
addEntityAttributes(span, context.entityRef);
|
||||
addProcessorAttributes(span, 'preProcessEntity', processor);
|
||||
try {
|
||||
innerRes = await processor.preProcessEntity!(
|
||||
innerRes,
|
||||
context.location,
|
||||
context.collector.forProcessor(processor),
|
||||
context.originLocation,
|
||||
context.cache.forProcessor(processor),
|
||||
);
|
||||
} catch (e) {
|
||||
span.recordException(e);
|
||||
span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
span.end();
|
||||
throw new InputError(
|
||||
`Processor ${processor.constructor.name} threw an error while preprocessing`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
span.end();
|
||||
return innerRes;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -361,19 +387,29 @@ export class DefaultCatalogProcessingOrchestrator
|
||||
|
||||
for (const processor of this.options.processors) {
|
||||
if (processor.postProcessEntity) {
|
||||
try {
|
||||
res = await processor.postProcessEntity(
|
||||
res,
|
||||
context.location,
|
||||
context.collector.forProcessor(processor),
|
||||
context.cache.forProcessor(processor),
|
||||
);
|
||||
} catch (e) {
|
||||
throw new InputError(
|
||||
`Processor ${processor.constructor.name} threw an error while postprocessing`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
let innerRes = res;
|
||||
res = await tracer.startActiveSpan('ProcessingStep', async span => {
|
||||
addEntityAttributes(span, context.entityRef);
|
||||
addProcessorAttributes(span, 'postProcessEntity', processor);
|
||||
try {
|
||||
innerRes = await processor.postProcessEntity!(
|
||||
innerRes,
|
||||
context.location,
|
||||
context.collector.forProcessor(processor),
|
||||
context.cache.forProcessor(processor),
|
||||
);
|
||||
} catch (e) {
|
||||
span.recordException(e);
|
||||
span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
span.end();
|
||||
throw new InputError(
|
||||
`Processor ${processor.constructor.name} threw an error while postprocessing`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
span.end();
|
||||
return innerRes;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Copyright 2023 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Span, SpanStatusCode } from '@opentelemetry/api';
|
||||
import { parseEntityRef } from '@backstage/catalog-model';
|
||||
|
||||
export const TRACER_ID = 'backstage-plugin-catalog-backend';
|
||||
|
||||
export function addEntityAttributes(span: Span, entityRef: string) {
|
||||
try {
|
||||
const fields = parseEntityRef(entityRef);
|
||||
span.setAttribute('backstage.entity.kind', fields.kind);
|
||||
span.setAttribute('backstage.entity.namespace', fields.namespace);
|
||||
span.setAttribute('backstage.entity.name', fields.name);
|
||||
} catch (err) {
|
||||
span.recordException(err);
|
||||
span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user