events-backend: await subscribers and log errors
Signed-off-by: Patrik Oldsberg <poldsberg@gmail.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-events-backend': patch
|
||||
---
|
||||
|
||||
The default event broker will now catch and log errors thrown by the `onEvent` method of subscribers. The returned promise from `publish` method will also not resolve until all subscribers have handled the event.
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import { getVoidLogger } from '@backstage/backend-common';
|
||||
import { TestEventSubscriber } from '@backstage/plugin-events-backend-test-utils';
|
||||
import { EventParams, EventSubscriber } from '@backstage/plugin-events-node';
|
||||
import { InMemoryEventBroker } from './InMemoryEventBroker';
|
||||
|
||||
const logger = getVoidLogger();
|
||||
@@ -63,4 +64,51 @@ describe('InMemoryEventBroker', () => {
|
||||
eventPayload: { test: 'topicC' },
|
||||
});
|
||||
});
|
||||
|
||||
it('logs errors from subscribers', async () => {
|
||||
const topic = 'testTopic';
|
||||
|
||||
const subscriber1 = new (class Subscriber1 implements EventSubscriber {
|
||||
supportsEventTopics() {
|
||||
return [topic];
|
||||
}
|
||||
async onEvent(event: EventParams) {
|
||||
throw new Error(`NOPE ${event.eventPayload}`);
|
||||
}
|
||||
})();
|
||||
const subscriber2 = new (class Subscriber2 implements EventSubscriber {
|
||||
supportsEventTopics() {
|
||||
return [topic];
|
||||
}
|
||||
async onEvent(event: EventParams) {
|
||||
throw new Error(`NOPE ${event.eventPayload}`);
|
||||
}
|
||||
})();
|
||||
|
||||
const errorSpy = jest.spyOn(logger, 'error');
|
||||
const eventBroker = new InMemoryEventBroker(logger);
|
||||
|
||||
eventBroker.subscribe(subscriber1);
|
||||
await eventBroker.publish({ topic, eventPayload: '1' });
|
||||
|
||||
expect(errorSpy).toHaveBeenCalledTimes(1);
|
||||
expect(errorSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "Subscriber1" failed to process event',
|
||||
new Error('NOPE 1'),
|
||||
);
|
||||
|
||||
eventBroker.subscribe(subscriber2);
|
||||
await eventBroker.publish({ topic, eventPayload: '2' });
|
||||
|
||||
// With two subscribers we should not halt on the first error but call all subscribers
|
||||
expect(errorSpy).toHaveBeenCalledTimes(3);
|
||||
expect(errorSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "Subscriber1" failed to process event',
|
||||
new Error('NOPE 2'),
|
||||
);
|
||||
expect(errorSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "Subscriber2" failed to process event',
|
||||
new Error('NOPE 2'),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -42,7 +42,18 @@ export class InMemoryEventBroker implements EventBroker {
|
||||
);
|
||||
|
||||
const subscribed = this.subscribers[params.topic] ?? [];
|
||||
subscribed.forEach(subscriber => subscriber.onEvent(params));
|
||||
await Promise.all(
|
||||
subscribed.map(async subscriber => {
|
||||
try {
|
||||
await subscriber.onEvent(params);
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Subscriber "${subscriber.constructor.name}" failed to process event`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
subscribe(
|
||||
|
||||
Reference in New Issue
Block a user