events-backend: await subscribers and log errors

Signed-off-by: Patrik Oldsberg <poldsberg@gmail.com>
This commit is contained in:
Patrik Oldsberg
2023-01-02 14:17:25 +01:00
parent 9311ee4268
commit 217149ae98
3 changed files with 65 additions and 1 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-events-backend': patch
---
The default event broker will now catch and log errors thrown by the `onEvent` method of subscribers. The returned promise from `publish` method will also not resolve until all subscribers have handled the event.
@@ -16,6 +16,7 @@
import { getVoidLogger } from '@backstage/backend-common';
import { TestEventSubscriber } from '@backstage/plugin-events-backend-test-utils';
import { EventParams, EventSubscriber } from '@backstage/plugin-events-node';
import { InMemoryEventBroker } from './InMemoryEventBroker';
const logger = getVoidLogger();
@@ -63,4 +64,51 @@ describe('InMemoryEventBroker', () => {
eventPayload: { test: 'topicC' },
});
});
it('logs errors from subscribers', async () => {
const topic = 'testTopic';
const subscriber1 = new (class Subscriber1 implements EventSubscriber {
supportsEventTopics() {
return [topic];
}
async onEvent(event: EventParams) {
throw new Error(`NOPE ${event.eventPayload}`);
}
})();
const subscriber2 = new (class Subscriber2 implements EventSubscriber {
supportsEventTopics() {
return [topic];
}
async onEvent(event: EventParams) {
throw new Error(`NOPE ${event.eventPayload}`);
}
})();
const errorSpy = jest.spyOn(logger, 'error');
const eventBroker = new InMemoryEventBroker(logger);
eventBroker.subscribe(subscriber1);
await eventBroker.publish({ topic, eventPayload: '1' });
expect(errorSpy).toHaveBeenCalledTimes(1);
expect(errorSpy).toHaveBeenCalledWith(
'Subscriber "Subscriber1" failed to process event',
new Error('NOPE 1'),
);
eventBroker.subscribe(subscriber2);
await eventBroker.publish({ topic, eventPayload: '2' });
// With two subscribers we should not halt on the first error but call all subscribers
expect(errorSpy).toHaveBeenCalledTimes(3);
expect(errorSpy).toHaveBeenCalledWith(
'Subscriber "Subscriber1" failed to process event',
new Error('NOPE 2'),
);
expect(errorSpy).toHaveBeenCalledWith(
'Subscriber "Subscriber2" failed to process event',
new Error('NOPE 2'),
);
});
});
@@ -42,7 +42,18 @@ export class InMemoryEventBroker implements EventBroker {
);
const subscribed = this.subscribers[params.topic] ?? [];
subscribed.forEach(subscriber => subscriber.onEvent(params));
await Promise.all(
subscribed.map(async subscriber => {
try {
await subscriber.onEvent(params);
} catch (error) {
this.logger.error(
`Subscriber "${subscriber.constructor.name}" failed to process event`,
error,
);
}
}),
);
}
subscribe(