feature: provide error listeners for catalog processing engines to notify the callers if an entity can not be processed.
Signed-off-by: Hasan Oezdemir <21654050+nodify-at@users.noreply.github.com>
This commit is contained in:
@@ -0,0 +1,6 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-backend': patch
|
||||
---
|
||||
|
||||
Provide a new listener `CatalogProcessingErrorListener` for the processing engines to notify the caller if an entity can not be processed. Processing engine
|
||||
will collect the errors and passes the entity and the results to the given listeners.
|
||||
@@ -15,6 +15,7 @@ import { DocumentCollatorFactory } from '@backstage/plugin-search-common';
|
||||
import { Entity } from '@backstage/catalog-model';
|
||||
import { EntityPolicy } from '@backstage/catalog-model';
|
||||
import { GetEntitiesRequest } from '@backstage/catalog-client';
|
||||
import { JsonObject } from '@backstage/types';
|
||||
import { JsonValue } from '@backstage/types';
|
||||
import { LocationEntityV1alpha1 } from '@backstage/catalog-model';
|
||||
import { Logger } from 'winston';
|
||||
@@ -201,12 +202,24 @@ export type CatalogPermissionRule<TParams extends unknown[] = unknown[]> =
|
||||
|
||||
// @public (undocumented)
|
||||
export interface CatalogProcessingEngine {
|
||||
// (undocumented)
|
||||
addErrorListener?(errorListener: CatalogProcessingErrorListener): void;
|
||||
// (undocumented)
|
||||
start(): Promise<void>;
|
||||
// (undocumented)
|
||||
stop(): Promise<void>;
|
||||
}
|
||||
|
||||
// @public
|
||||
export interface CatalogProcessingErrorListener {
|
||||
// (undocumented)
|
||||
onError(
|
||||
unprocessedEntity: Entity,
|
||||
result: EntityProcessingResult,
|
||||
resultHash: String,
|
||||
): Promise<void>;
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export type CatalogProcessor = {
|
||||
getProcessorName(): string;
|
||||
@@ -412,6 +425,21 @@ export type EntityFilter =
|
||||
}
|
||||
| EntitiesSearchFilter;
|
||||
|
||||
// @public
|
||||
export type EntityProcessingResult =
|
||||
| {
|
||||
ok: true;
|
||||
state: JsonObject;
|
||||
completedEntity: Entity;
|
||||
deferredEntities: DeferredEntity[];
|
||||
relations: EntityRelationSpec[];
|
||||
errors: Error[];
|
||||
}
|
||||
| {
|
||||
ok: false;
|
||||
errors: Error[];
|
||||
};
|
||||
|
||||
// @public
|
||||
export interface EntityProvider {
|
||||
connect(connection: EntityProviderConnection): Promise<void>;
|
||||
|
||||
@@ -23,9 +23,10 @@ import { ProcessingDatabase, RefreshStateItem } from '../database/types';
|
||||
import { createCounterMetric, createSummaryMetric } from '../util/metrics';
|
||||
import {
|
||||
CatalogProcessingEngine,
|
||||
CatalogProcessingErrorListener,
|
||||
CatalogProcessingOrchestrator,
|
||||
EntityProcessingResult,
|
||||
} from '../processing/types';
|
||||
} from './types';
|
||||
import { Stitcher } from '../stitching/Stitcher';
|
||||
import { startTaskPipeline } from './TaskPipeline';
|
||||
|
||||
@@ -33,6 +34,7 @@ const CACHE_TTL = 5;
|
||||
|
||||
export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
|
||||
private readonly tracker = progressTracker();
|
||||
private readonly errorListeners: CatalogProcessingErrorListener[] = [];
|
||||
private stopFunc?: () => void;
|
||||
|
||||
constructor(
|
||||
@@ -122,6 +124,7 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
|
||||
);
|
||||
|
||||
let hashBuilder = this.createHash().update(errorsString);
|
||||
|
||||
if (result.ok) {
|
||||
const { entityRefs: parents } =
|
||||
await this.processingDatabase.transaction(tx =>
|
||||
@@ -154,6 +157,11 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
|
||||
// just store the errors and trigger a stich so that they become visible to
|
||||
// the outside.
|
||||
if (!result.ok) {
|
||||
// notify the error listeners if the entity can not be processed.
|
||||
this.errorListeners.forEach(listener =>
|
||||
listener.onError(unprocessedEntity, result, resultHash),
|
||||
);
|
||||
|
||||
await this.processingDatabase.transaction(async tx => {
|
||||
await this.processingDatabase.updateProcessedEntityErrors(tx, {
|
||||
id,
|
||||
@@ -223,6 +231,10 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
|
||||
this.stopFunc = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
addErrorListener(errorListener: CatalogProcessingErrorListener) {
|
||||
this.errorListeners.push(errorListener);
|
||||
}
|
||||
}
|
||||
|
||||
// Helps wrap the timing and logging behaviors
|
||||
|
||||
@@ -14,7 +14,12 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export type { CatalogProcessingEngine, DeferredEntity } from './types';
|
||||
export type {
|
||||
CatalogProcessingEngine,
|
||||
EntityProcessingResult,
|
||||
DeferredEntity,
|
||||
CatalogProcessingErrorListener,
|
||||
} from './types';
|
||||
|
||||
export { createRandomProcessingInterval } from './refresh';
|
||||
export type { ProcessingIntervalFunction } from './refresh';
|
||||
|
||||
@@ -66,4 +66,18 @@ export type DeferredEntity = {
|
||||
export interface CatalogProcessingEngine {
|
||||
start(): Promise<void>;
|
||||
stop(): Promise<void>;
|
||||
addErrorListener?(errorListener: CatalogProcessingErrorListener): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* An error listener for catalog processing engine. It can be used to listen and track entity errors.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export interface CatalogProcessingErrorListener {
|
||||
onError(
|
||||
unprocessedEntity: Entity,
|
||||
result: EntityProcessingResult,
|
||||
resultHash: String,
|
||||
): Promise<void>;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user