feat(catalog-backend): Add observability for catalog processing

Signed-off-by: Mike Bryant <mike.bryant@mettle.co.uk>
Signed-off-by: Mike Bryant <mike@mikebryant.me.uk>
This commit is contained in:
Mike Bryant
2023-04-26 19:55:05 +01:00
committed by Mike Bryant
parent 6c0867be8d
commit f32252cdf6
4 changed files with 266 additions and 181 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend': patch
---
Added OpenTelemetry spans for catalog processing
@@ -23,7 +23,7 @@ import { assertError, serializeError, stringifyError } from '@backstage/errors';
import { Hash } from 'crypto';
import stableStringify from 'fast-json-stable-stringify';
import { Logger } from 'winston';
import { metrics } from '@opentelemetry/api';
import { metrics, SpanStatusCode, trace } from '@opentelemetry/api';
import { ProcessingDatabase, RefreshStateItem } from '../database/types';
import { createCounterMetric, createSummaryMetric } from '../util/metrics';
import {
@@ -35,9 +35,12 @@ import { Stitcher } from '../stitching/Stitcher';
import { startTaskPipeline } from './TaskPipeline';
import { PluginTaskScheduler } from '@backstage/backend-tasks';
import { Config } from '@backstage/config';
import { addEntityAttributes, TRACER_ID } from '../util/opentelemetry';
const CACHE_TTL = 5;
const tracer = trace.getTracer(TRACER_ID);
export type ProgressTracker = ReturnType<typeof progressTracker>;
export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
@@ -131,177 +134,186 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
}
},
processTask: async item => {
const track = this.tracker.processStart(item, this.logger);
await tracer.startActiveSpan('ProcessingRun', async span => {
const track = this.tracker.processStart(item, this.logger);
addEntityAttributes(span, item.entityRef);
try {
const {
id,
state,
unprocessedEntity,
entityRef,
locationKey,
resultHash: previousResultHash,
} = item;
const result = await this.orchestrator.process({
entity: unprocessedEntity,
state,
});
try {
const {
id,
state,
unprocessedEntity,
entityRef,
locationKey,
resultHash: previousResultHash,
} = item;
const result = await this.orchestrator.process({
entity: unprocessedEntity,
state,
});
track.markProcessorsCompleted(result);
track.markProcessorsCompleted(result);
if (result.ok) {
const { ttl: _, ...stateWithoutTtl } = state ?? {};
if (
stableStringify(stateWithoutTtl) !== stableStringify(result.state)
) {
if (result.ok) {
const { ttl: _, ...stateWithoutTtl } = state ?? {};
if (
stableStringify(stateWithoutTtl) !== stableStringify(result.state)
) {
await this.processingDatabase.transaction(async tx => {
await this.processingDatabase.updateEntityCache(tx, {
id,
state: {
ttl: CACHE_TTL,
...result.state,
},
});
});
}
} else {
const maybeTtl = state?.ttl;
const ttl = Number.isInteger(maybeTtl) ? (maybeTtl as number) : 0;
await this.processingDatabase.transaction(async tx => {
await this.processingDatabase.updateEntityCache(tx, {
id,
state: {
ttl: CACHE_TTL,
...result.state,
},
state: ttl > 0 ? { ...state, ttl: ttl - 1 } : {},
});
});
}
} else {
const maybeTtl = state?.ttl;
const ttl = Number.isInteger(maybeTtl) ? (maybeTtl as number) : 0;
await this.processingDatabase.transaction(async tx => {
await this.processingDatabase.updateEntityCache(tx, {
id,
state: ttl > 0 ? { ...state, ttl: ttl - 1 } : {},
const location =
unprocessedEntity?.metadata?.annotations?.[ANNOTATION_LOCATION];
for (const error of result.errors) {
this.logger.warn(error.message, {
entity: entityRef,
location,
});
});
}
}
const errorsString = JSON.stringify(
result.errors.map(e => serializeError(e)),
);
const location =
unprocessedEntity?.metadata?.annotations?.[ANNOTATION_LOCATION];
for (const error of result.errors) {
this.logger.warn(error.message, {
entity: entityRef,
location,
});
}
const errorsString = JSON.stringify(
result.errors.map(e => serializeError(e)),
);
let hashBuilder = this.createHash().update(errorsString);
let hashBuilder = this.createHash().update(errorsString);
if (result.ok) {
const { entityRefs: parents } =
await this.processingDatabase.transaction(tx =>
this.processingDatabase.listParents(tx, {
entityRef,
}),
);
hashBuilder = hashBuilder
.update(stableStringify({ ...result.completedEntity }))
.update(stableStringify([...result.deferredEntities]))
.update(stableStringify([...result.relations]))
.update(stableStringify([...result.refreshKeys]))
.update(stableStringify([...parents]));
}
const resultHash = hashBuilder.digest('hex');
if (resultHash === previousResultHash) {
// If nothing changed in our produced outputs, we cannot have any
// significant effect on our surroundings; therefore, we just abort
// without any updates / stitching.
track.markSuccessfulWithNoChanges();
return;
}
// If the result was marked as not OK, it signals that some part of the
// processing pipeline threw an exception. This can happen both as part of
// non-catastrophic things such as due to validation errors, as well as if
// something fatal happens inside the processing for other reasons. In any
// case, this means we can't trust that anything in the output is okay. So
// just store the errors and trigger a stich so that they become visible to
// the outside.
if (!result.ok) {
// notify the error listener if the entity can not be processed.
Promise.resolve(undefined)
.then(() =>
this.onProcessingError?.({
unprocessedEntity,
errors: result.errors,
}),
)
.catch(error => {
this.logger.debug(
`Processing error listener threw an exception, ${stringifyError(
error,
)}`,
if (result.ok) {
const { entityRefs: parents } =
await this.processingDatabase.transaction(tx =>
this.processingDatabase.listParents(tx, {
entityRef,
}),
);
});
hashBuilder = hashBuilder
.update(stableStringify({ ...result.completedEntity }))
.update(stableStringify([...result.deferredEntities]))
.update(stableStringify([...result.relations]))
.update(stableStringify([...result.refreshKeys]))
.update(stableStringify([...parents]));
}
const resultHash = hashBuilder.digest('hex');
if (resultHash === previousResultHash) {
// If nothing changed in our produced outputs, we cannot have any
// significant effect on our surroundings; therefore, we just abort
// without any updates / stitching.
track.markSuccessfulWithNoChanges();
span.end();
return;
}
// If the result was marked as not OK, it signals that some part of the
// processing pipeline threw an exception. This can happen both as part of
// non-catastrophic things such as due to validation errors, as well as if
// something fatal happens inside the processing for other reasons. In any
// case, this means we can't trust that anything in the output is okay. So
// just store the errors and trigger a stich so that they become visible to
// the outside.
if (!result.ok) {
// notify the error listener if the entity can not be processed.
Promise.resolve(undefined)
.then(() =>
this.onProcessingError?.({
unprocessedEntity,
errors: result.errors,
}),
)
.catch(error => {
this.logger.debug(
`Processing error listener threw an exception, ${stringifyError(
error,
)}`,
);
});
await this.processingDatabase.transaction(async tx => {
await this.processingDatabase.updateProcessedEntityErrors(tx, {
id,
errors: errorsString,
resultHash,
});
});
await this.stitcher.stitch(
new Set([stringifyEntityRef(unprocessedEntity)]),
);
track.markSuccessfulWithErrors();
span.setStatus({ code: SpanStatusCode.ERROR });
span.end();
return;
}
result.completedEntity.metadata.uid = id;
let oldRelationSources: Map<string, string>;
await this.processingDatabase.transaction(async tx => {
await this.processingDatabase.updateProcessedEntityErrors(tx, {
id,
errors: errorsString,
resultHash,
});
const { previous } =
await this.processingDatabase.updateProcessedEntity(tx, {
id,
processedEntity: result.completedEntity,
resultHash,
errors: errorsString,
relations: result.relations,
deferredEntities: result.deferredEntities,
locationKey,
refreshKeys: result.refreshKeys,
});
oldRelationSources = new Map(
previous.relations.map(r => [
`${r.source_entity_ref}:${r.type}`,
r.source_entity_ref,
]),
);
});
await this.stitcher.stitch(
new Set([stringifyEntityRef(unprocessedEntity)]),
const newRelationSources = new Map<string, string>(
result.relations.map(relation => {
const sourceEntityRef = stringifyEntityRef(relation.source);
return [`${sourceEntityRef}:${relation.type}`, sourceEntityRef];
}),
);
track.markSuccessfulWithErrors();
return;
const setOfThingsToStitch = new Set<string>([
stringifyEntityRef(result.completedEntity),
]);
newRelationSources.forEach((sourceEntityRef, uniqueKey) => {
if (!oldRelationSources.has(uniqueKey)) {
setOfThingsToStitch.add(sourceEntityRef);
}
});
oldRelationSources!.forEach((sourceEntityRef, uniqueKey) => {
if (!newRelationSources.has(uniqueKey)) {
setOfThingsToStitch.add(sourceEntityRef);
}
});
await this.stitcher.stitch(setOfThingsToStitch);
track.markSuccessfulWithChanges(setOfThingsToStitch.size);
} catch (error) {
assertError(error);
track.markFailed(error);
span.recordException(error);
span.setStatus({ code: SpanStatusCode.ERROR });
}
result.completedEntity.metadata.uid = id;
let oldRelationSources: Map<string, string>;
await this.processingDatabase.transaction(async tx => {
const { previous } =
await this.processingDatabase.updateProcessedEntity(tx, {
id,
processedEntity: result.completedEntity,
resultHash,
errors: errorsString,
relations: result.relations,
deferredEntities: result.deferredEntities,
locationKey,
refreshKeys: result.refreshKeys,
});
oldRelationSources = new Map(
previous.relations.map(r => [
`${r.source_entity_ref}:${r.type}`,
r.source_entity_ref,
]),
);
});
const newRelationSources = new Map<string, string>(
result.relations.map(relation => {
const sourceEntityRef = stringifyEntityRef(relation.source);
return [`${sourceEntityRef}:${relation.type}`, sourceEntityRef];
}),
);
const setOfThingsToStitch = new Set<string>([
stringifyEntityRef(result.completedEntity),
]);
newRelationSources.forEach((sourceEntityRef, uniqueKey) => {
if (!oldRelationSources.has(uniqueKey)) {
setOfThingsToStitch.add(sourceEntityRef);
}
});
oldRelationSources!.forEach((sourceEntityRef, uniqueKey) => {
if (!newRelationSources.has(uniqueKey)) {
setOfThingsToStitch.add(sourceEntityRef);
}
});
await this.stitcher.stitch(setOfThingsToStitch);
track.markSuccessfulWithChanges(setOfThingsToStitch.size);
} catch (error) {
assertError(error);
track.markFailed(error);
}
span.end();
});
},
});
}
@@ -14,6 +14,7 @@
* limitations under the License.
*/
import { Span, SpanStatusCode, trace } from '@opentelemetry/api';
import {
Entity,
EntityPolicy,
@@ -55,6 +56,9 @@ import {
} from './util';
import { CatalogRulesEnforcer } from '../ingestion/CatalogRules';
import { ProcessorCacheManager } from './ProcessorCacheManager';
import { addEntityAttributes, TRACER_ID } from '../util/opentelemetry';
const tracer = trace.getTracer(TRACER_ID);
type Context = {
entityRef: string;
@@ -64,6 +68,18 @@ type Context = {
cache: ProcessorCacheManager;
};
function addProcessorAttributes(
span: Span,
stage: string,
processor: CatalogProcessor,
) {
span.setAttribute('backstage.catalog.processor.stage', stage);
span.setAttribute(
'backstage.catalog.processor.name',
processor.getProcessorName(),
);
}
/** @public */
export class DefaultCatalogProcessingOrchestrator
implements CatalogProcessingOrchestrator
@@ -183,20 +199,30 @@ export class DefaultCatalogProcessingOrchestrator
for (const processor of this.options.processors) {
if (processor.preProcessEntity) {
try {
res = await processor.preProcessEntity(
res,
context.location,
context.collector.forProcessor(processor),
context.originLocation,
context.cache.forProcessor(processor),
);
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while preprocessing`,
e,
);
}
let innerRes = res;
res = await tracer.startActiveSpan('ProcessingStep', async span => {
addEntityAttributes(span, context.entityRef);
addProcessorAttributes(span, 'preProcessEntity', processor);
try {
innerRes = await processor.preProcessEntity!(
innerRes,
context.location,
context.collector.forProcessor(processor),
context.originLocation,
context.cache.forProcessor(processor),
);
} catch (e) {
span.recordException(e);
span.setStatus({ code: SpanStatusCode.ERROR });
span.end();
throw new InputError(
`Processor ${processor.constructor.name} threw an error while preprocessing`,
e,
);
}
span.end();
return innerRes;
});
}
}
@@ -361,19 +387,29 @@ export class DefaultCatalogProcessingOrchestrator
for (const processor of this.options.processors) {
if (processor.postProcessEntity) {
try {
res = await processor.postProcessEntity(
res,
context.location,
context.collector.forProcessor(processor),
context.cache.forProcessor(processor),
);
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while postprocessing`,
e,
);
}
let innerRes = res;
res = await tracer.startActiveSpan('ProcessingStep', async span => {
addEntityAttributes(span, context.entityRef);
addProcessorAttributes(span, 'postProcessEntity', processor);
try {
innerRes = await processor.postProcessEntity!(
innerRes,
context.location,
context.collector.forProcessor(processor),
context.cache.forProcessor(processor),
);
} catch (e) {
span.recordException(e);
span.setStatus({ code: SpanStatusCode.ERROR });
span.end();
throw new InputError(
`Processor ${processor.constructor.name} threw an error while postprocessing`,
e,
);
}
span.end();
return innerRes;
});
}
}
@@ -0,0 +1,32 @@
/*
* Copyright 2023 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Span, SpanStatusCode } from '@opentelemetry/api';
import { parseEntityRef } from '@backstage/catalog-model';
export const TRACER_ID = 'backstage-plugin-catalog-backend';
export function addEntityAttributes(span: Span, entityRef: string) {
try {
const fields = parseEntityRef(entityRef);
span.setAttribute('backstage.entity.kind', fields.kind);
span.setAttribute('backstage.entity.namespace', fields.namespace);
span.setAttribute('backstage.entity.name', fields.name);
} catch (err) {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
}