From 0b57aa1495dd514f70b8c9cd0a0cbfb648363b4f Mon Sep 17 00:00:00 2001 From: Patrik Oldsberg Date: Thu, 31 Oct 2024 10:46:50 +0100 Subject: [PATCH 1/2] events-node: fix event bus poll duplication Signed-off-by: Patrik Oldsberg --- .changeset/yellow-horses-pretend.md | 5 ++ .../src/api/DefaultEventsService.test.ts | 88 +++++++++++++++++++ .../src/api/DefaultEventsService.ts | 38 ++++---- 3 files changed, 109 insertions(+), 22 deletions(-) create mode 100644 .changeset/yellow-horses-pretend.md diff --git a/.changeset/yellow-horses-pretend.md b/.changeset/yellow-horses-pretend.md new file mode 100644 index 0000000000..3171993d2a --- /dev/null +++ b/.changeset/yellow-horses-pretend.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-events-node': patch +--- + +Fixed an issue where the event bus polling would duplicate and increase exponentially over time. diff --git a/plugins/events-node/src/api/DefaultEventsService.test.ts b/plugins/events-node/src/api/DefaultEventsService.test.ts index d2fda67d6c..a7fc7ba50b 100644 --- a/plugins/events-node/src/api/DefaultEventsService.test.ts +++ b/plugins/events-node/src/api/DefaultEventsService.test.ts @@ -160,6 +160,94 @@ describe('DefaultEventsService', () => { await (service as any).shutdown(); }); + it('should wait an poll on timeout', async () => { + const logger = mockServices.logger.mock(); + const service = DefaultEventsService.create({ logger }).forPlugin('a', { + auth: mockServices.auth(), + logger, + discovery: mockServices.discovery(), + lifecycle: mockServices.lifecycle.mock(), + }); + + let callCount = 0; + + let blockingController: ReadableStreamDefaultController; + const blockingStream = new ReadableStream({ + start(controller) { + blockingController = controller; + }, + }); + + mswServer.use( + rest.put( + 'http://localhost:0/api/events/bus/v1/subscriptions/a.tester', + (_req, res, ctx) => res(ctx.status(200)), + ), + // The first and third calls result in a blocking 202 that is resolved after 100ms + // The second and fourth calls result in a 200 with an event + // The fifth call blocks until the end of the test + // No more than 5 calls should be made + rest.get( + 'http://localhost:0/api/events/bus/v1/subscriptions/a.tester/events', + (_req, res, ctx) => { + callCount += 1; + if (callCount === 1 || callCount === 3) { + return res( + ctx.status(202), + ctx.body( + new ReadableStream({ + start(controller) { + setTimeout(() => controller.close(), 100); + }, + }), + ), + ); + } else if (callCount === 2 || callCount === 4) { + return res( + ctx.status(200), + ctx.json({ + events: [{ topic: 'test', payload: { callCount } }], + }), + ); + } else if (callCount === 5) { + return res(ctx.status(202), ctx.body(blockingStream)); + } + throw new Error(`events endpoint called too many times`); + }, + ), + ); + + const event = await new Promise(resolve => { + const events = new Array(); + service.subscribe({ + id: 'tester', + topics: ['test'], + async onEvent(newEvent) { + events.push(newEvent); + if (events.length === 2) { + resolve(events); + } + }, + }); + }); + + expect(event).toEqual([ + { topic: 'test', eventPayload: { callCount: 2 } }, + { topic: 'test', eventPayload: { callCount: 4 } }, + ]); + + // Wait to make sure no additional calls happen + await new Promise(resolve => setTimeout(resolve, 100)); + + expect(callCount).toBe(5); + + // Internal call to clean up subscriptions + await (service as any).shutdown(); + + // Close the stream for the 5th call so that we don't leave the request hanging + blockingController!.close(); + }); + it('should not read events from bus if disabled', async () => { const logger = mockServices.logger.mock(); const service = DefaultEventsService.create({ diff --git a/plugins/events-node/src/api/DefaultEventsService.ts b/plugins/events-node/src/api/DefaultEventsService.ts index 4eafd7d389..caafe50d8f 100644 --- a/plugins/events-node/src/api/DefaultEventsService.ts +++ b/plugins/events-node/src/api/DefaultEventsService.ts @@ -207,28 +207,14 @@ class PluginEventsService implements EventsService { { token }, ); - if (!res.ok) { - if (res.status === 404) { - this.logger.info( - `Polling event subscription resulted in a 404, recreating subscription`, - ); - hasSubscription = false; - } else { - throw await ResponseError.fromResponse(res); - } - } - - // Successful response, reset backoff - backoffMs = POLL_BACKOFF_START_MS; - - // 202 means there were no immediately available events, but the - // response will block until either new events are available or the - // request times out. In both cases we should should try to read events - // immediately again if (res.status === 202) { + // 202 means there were no immediately available events, but the + // response will block until either new events are available or the + // request times out. In both cases we should should try to read events + // immediately again + lock.release(); await res.body?.getReader()?.closed; - process.nextTick(poll); } else if (res.status === 200) { const data = await res.json(); if (data) { @@ -245,10 +231,15 @@ class PluginEventsService implements EventsService { ); } } - } else { - this.logger.warn( - `Unexpected response status ${res.status} from events backend for subscription "${subscriptionId}"`, + } + } else { + if (res.status === 404) { + this.logger.info( + `Polling event subscription resulted in a 404, recreating subscription`, ); + hasSubscription = false; + } else { + throw await ResponseError.fromResponse(res); } } } @@ -276,6 +267,9 @@ class PluginEventsService implements EventsService { } } + // No errors, reset backoff + backoffMs = POLL_BACKOFF_START_MS; + process.nextTick(poll); } catch (error) { this.logger.warn( From 1c7d10eea1bb34d353230c877781a534a47fee5b Mon Sep 17 00:00:00 2001 From: Patrik Oldsberg Date: Thu, 31 Oct 2024 11:14:47 +0100 Subject: [PATCH 2/2] events-node: more reliable way to wait for 202 body to close Signed-off-by: Patrik Oldsberg --- plugins/events-node/src/api/DefaultEventsService.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/events-node/src/api/DefaultEventsService.ts b/plugins/events-node/src/api/DefaultEventsService.ts index caafe50d8f..7a35ed305a 100644 --- a/plugins/events-node/src/api/DefaultEventsService.ts +++ b/plugins/events-node/src/api/DefaultEventsService.ts @@ -214,7 +214,10 @@ class PluginEventsService implements EventsService { // immediately again lock.release(); - await res.body?.getReader()?.closed; + // We don't actually expect any response body here, but waiting for + // an empty body to be returned has been more reliable that waiting + // for the response body stream to close. + await res.text(); } else if (res.status === 200) { const data = await res.json(); if (data) {