feature: provide error listeners for catalog processing engines to notify the callers if an entity can not be processed.

Signed-off-by: Hasan Oezdemir <21654050+nodify-at@users.noreply.github.com>
This commit is contained in:
Hasan Oezdemir
2022-05-31 13:05:43 +02:00
parent 008f0c2b74
commit fa0533e604
5 changed files with 67 additions and 2 deletions
+6
View File
@@ -0,0 +1,6 @@
---
'@backstage/plugin-catalog-backend': patch
---
Provide a new listener `CatalogProcessingErrorListener` for the processing engines to notify the caller if an entity can not be processed. Processing engine
will collect the errors and passes the entity and the results to the given listeners.
+28
View File
@@ -15,6 +15,7 @@ import { DocumentCollatorFactory } from '@backstage/plugin-search-common';
import { Entity } from '@backstage/catalog-model';
import { EntityPolicy } from '@backstage/catalog-model';
import { GetEntitiesRequest } from '@backstage/catalog-client';
import { JsonObject } from '@backstage/types';
import { JsonValue } from '@backstage/types';
import { LocationEntityV1alpha1 } from '@backstage/catalog-model';
import { Logger } from 'winston';
@@ -201,12 +202,24 @@ export type CatalogPermissionRule<TParams extends unknown[] = unknown[]> =
// @public (undocumented)
export interface CatalogProcessingEngine {
// (undocumented)
addErrorListener?(errorListener: CatalogProcessingErrorListener): void;
// (undocumented)
start(): Promise<void>;
// (undocumented)
stop(): Promise<void>;
}
// @public
export interface CatalogProcessingErrorListener {
// (undocumented)
onError(
unprocessedEntity: Entity,
result: EntityProcessingResult,
resultHash: String,
): Promise<void>;
}
// @public (undocumented)
export type CatalogProcessor = {
getProcessorName(): string;
@@ -412,6 +425,21 @@ export type EntityFilter =
}
| EntitiesSearchFilter;
// @public
export type EntityProcessingResult =
| {
ok: true;
state: JsonObject;
completedEntity: Entity;
deferredEntities: DeferredEntity[];
relations: EntityRelationSpec[];
errors: Error[];
}
| {
ok: false;
errors: Error[];
};
// @public
export interface EntityProvider {
connect(connection: EntityProviderConnection): Promise<void>;
@@ -23,9 +23,10 @@ import { ProcessingDatabase, RefreshStateItem } from '../database/types';
import { createCounterMetric, createSummaryMetric } from '../util/metrics';
import {
CatalogProcessingEngine,
CatalogProcessingErrorListener,
CatalogProcessingOrchestrator,
EntityProcessingResult,
} from '../processing/types';
} from './types';
import { Stitcher } from '../stitching/Stitcher';
import { startTaskPipeline } from './TaskPipeline';
@@ -33,6 +34,7 @@ const CACHE_TTL = 5;
export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
private readonly tracker = progressTracker();
private readonly errorListeners: CatalogProcessingErrorListener[] = [];
private stopFunc?: () => void;
constructor(
@@ -122,6 +124,7 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
);
let hashBuilder = this.createHash().update(errorsString);
if (result.ok) {
const { entityRefs: parents } =
await this.processingDatabase.transaction(tx =>
@@ -154,6 +157,11 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
// just store the errors and trigger a stich so that they become visible to
// the outside.
if (!result.ok) {
// notify the error listeners if the entity can not be processed.
this.errorListeners.forEach(listener =>
listener.onError(unprocessedEntity, result, resultHash),
);
await this.processingDatabase.transaction(async tx => {
await this.processingDatabase.updateProcessedEntityErrors(tx, {
id,
@@ -223,6 +231,10 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
this.stopFunc = undefined;
}
}
addErrorListener(errorListener: CatalogProcessingErrorListener) {
this.errorListeners.push(errorListener);
}
}
// Helps wrap the timing and logging behaviors
@@ -14,7 +14,12 @@
* limitations under the License.
*/
export type { CatalogProcessingEngine, DeferredEntity } from './types';
export type {
CatalogProcessingEngine,
EntityProcessingResult,
DeferredEntity,
CatalogProcessingErrorListener,
} from './types';
export { createRandomProcessingInterval } from './refresh';
export type { ProcessingIntervalFunction } from './refresh';
@@ -66,4 +66,18 @@ export type DeferredEntity = {
export interface CatalogProcessingEngine {
start(): Promise<void>;
stop(): Promise<void>;
addErrorListener?(errorListener: CatalogProcessingErrorListener): void;
}
/**
* An error listener for catalog processing engine. It can be used to listen and track entity errors.
*
* @public
*/
export interface CatalogProcessingErrorListener {
onError(
unprocessedEntity: Entity,
result: EntityProcessingResult,
resultHash: String,
): Promise<void>;
}