Merge pull request #22529 from drodil/signals_improvements
fix: signal disconnect loop on server start
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
---
|
||||
'@backstage/plugin-signals-backend': patch
|
||||
'@backstage/plugin-signals-react': patch
|
||||
'@backstage/plugin-signals-node': patch
|
||||
'@backstage/plugin-signals': patch
|
||||
---
|
||||
|
||||
Fix disconnect loop on server start
|
||||
@@ -24,5 +24,6 @@ export default async function createPlugin(
|
||||
logger: env.logger,
|
||||
eventBroker: env.eventBroker,
|
||||
identity: env.identity,
|
||||
discovery: env.discovery,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ export default async function createPlugin(
|
||||
logger: env.logger,
|
||||
eventBroker: env.eventBroker,
|
||||
identity: env.identity,
|
||||
discovery: env.discovery,
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
@@ -8,12 +8,15 @@ import { EventBroker } from '@backstage/plugin-events-node';
|
||||
import express from 'express';
|
||||
import { IdentityApi } from '@backstage/plugin-auth-node';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import { PluginEndpointDiscovery } from '@backstage/backend-common';
|
||||
|
||||
// @public (undocumented)
|
||||
export function createRouter(options: RouterOptions): Promise<express.Router>;
|
||||
|
||||
// @public (undocumented)
|
||||
export interface RouterOptions {
|
||||
// (undocumented)
|
||||
discovery: PluginEndpointDiscovery;
|
||||
// (undocumented)
|
||||
eventBroker?: EventBroker;
|
||||
// (undocumented)
|
||||
|
||||
@@ -32,14 +32,16 @@ export const signalsPlugin = createBackendPlugin({
|
||||
httpRouter: coreServices.httpRouter,
|
||||
logger: coreServices.logger,
|
||||
identity: coreServices.identity,
|
||||
discovery: coreServices.discovery,
|
||||
// TODO: EventBroker. It is optional for now but it's actually required so waiting for the new backend system
|
||||
// for the events-backend for this to work.
|
||||
},
|
||||
async init({ httpRouter, logger, identity }) {
|
||||
async init({ httpRouter, logger, identity, discovery }) {
|
||||
httpRouter.use(
|
||||
await createRouter({
|
||||
logger,
|
||||
identity,
|
||||
discovery,
|
||||
}),
|
||||
);
|
||||
},
|
||||
|
||||
@@ -71,7 +71,9 @@ export class SignalManager {
|
||||
id,
|
||||
user: identity?.identity.userEntityRef ?? 'user:default/guest',
|
||||
ws,
|
||||
ownershipEntityRefs: identity?.identity.ownershipEntityRefs ?? [],
|
||||
ownershipEntityRefs: identity?.identity.ownershipEntityRefs ?? [
|
||||
'user:default/guest',
|
||||
],
|
||||
subscriptions: new Set<string>(),
|
||||
};
|
||||
|
||||
@@ -132,6 +134,10 @@ export class SignalManager {
|
||||
|
||||
const { channel, recipients, message } = eventPayload;
|
||||
const jsonMessage = JSON.stringify({ channel, message });
|
||||
let users: string[] = [];
|
||||
if (recipients !== null) {
|
||||
users = Array.isArray(recipients) ? recipients : [recipients];
|
||||
}
|
||||
|
||||
// Actual websocket message sending
|
||||
this.connections.forEach(conn => {
|
||||
@@ -141,9 +147,7 @@ export class SignalManager {
|
||||
// Sending to all users can be done with null
|
||||
if (
|
||||
recipients !== null &&
|
||||
!conn.ownershipEntityRefs.some((ref: string) =>
|
||||
recipients.includes(ref),
|
||||
)
|
||||
!conn.ownershipEntityRefs.some((ref: string) => users.includes(ref))
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -13,7 +13,10 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { getVoidLogger } from '@backstage/backend-common';
|
||||
import {
|
||||
getVoidLogger,
|
||||
PluginEndpointDiscovery,
|
||||
} from '@backstage/backend-common';
|
||||
import express from 'express';
|
||||
import request from 'supertest';
|
||||
|
||||
@@ -30,6 +33,11 @@ const identityApiMock: jest.Mocked<IdentityApi> = {
|
||||
getIdentity: jest.fn(),
|
||||
};
|
||||
|
||||
const discovery: jest.Mocked<PluginEndpointDiscovery> = {
|
||||
getBaseUrl: jest.fn().mockResolvedValue('/api/signals'),
|
||||
getExternalBaseUrl: jest.fn(),
|
||||
};
|
||||
|
||||
describe('createRouter', () => {
|
||||
let app: express.Express;
|
||||
|
||||
@@ -38,6 +46,7 @@ describe('createRouter', () => {
|
||||
logger: getVoidLogger(),
|
||||
identity: identityApiMock,
|
||||
eventBroker: eventBrokerMock,
|
||||
discovery,
|
||||
});
|
||||
app = express().use(router);
|
||||
});
|
||||
|
||||
@@ -13,7 +13,10 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { errorHandler } from '@backstage/backend-common';
|
||||
import {
|
||||
errorHandler,
|
||||
PluginEndpointDiscovery,
|
||||
} from '@backstage/backend-common';
|
||||
import express, { NextFunction, Request, Response } from 'express';
|
||||
import Router from 'express-promise-router';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
@@ -33,13 +36,14 @@ export interface RouterOptions {
|
||||
logger: LoggerService;
|
||||
eventBroker?: EventBroker;
|
||||
identity: IdentityApi;
|
||||
discovery: PluginEndpointDiscovery;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export async function createRouter(
|
||||
options: RouterOptions,
|
||||
): Promise<express.Router> {
|
||||
const { logger, identity } = options;
|
||||
const { logger, identity, discovery } = options;
|
||||
const manager = SignalManager.create(options);
|
||||
let subscribedToUpgradeRequests = false;
|
||||
|
||||
@@ -66,9 +70,9 @@ export async function createRouter(
|
||||
}
|
||||
|
||||
subscribedToUpgradeRequests = true;
|
||||
const apiUrl = await discovery.getBaseUrl('signals');
|
||||
server.on('upgrade', async (request, socket, head) => {
|
||||
// TODO: Find a way to make this more generic
|
||||
if (request.url !== '/api/signals') {
|
||||
if (!request.url || !apiUrl.endsWith(request.url)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ export async function startStandaloneServer(
|
||||
logger,
|
||||
identity,
|
||||
eventBroker,
|
||||
discovery,
|
||||
});
|
||||
|
||||
let service = createServiceBuilder(module)
|
||||
|
||||
@@ -16,15 +16,15 @@ export class DefaultSignalService implements SignalService {
|
||||
|
||||
// @public (undocumented)
|
||||
export type SignalPayload = {
|
||||
recipients: string[] | null;
|
||||
recipients: string[] | string | null;
|
||||
channel: string;
|
||||
message: JsonObject;
|
||||
};
|
||||
|
||||
// @public (undocumented)
|
||||
export type SignalService = {
|
||||
export interface SignalService {
|
||||
publish(signal: SignalPayload): Promise<void>;
|
||||
};
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export const signalService: ServiceRef<SignalService, 'plugin'>;
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { DefaultSignalService } from './DefaultSignalService';
|
||||
|
||||
describe('DefaultSignalService', () => {
|
||||
const mockEventBroker = {
|
||||
publish: jest.fn(),
|
||||
subscribe: jest.fn(),
|
||||
};
|
||||
|
||||
const service = DefaultSignalService.create({ eventBroker: mockEventBroker });
|
||||
|
||||
it('should publish signal', () => {
|
||||
const signal = {
|
||||
channel: 'test-channel',
|
||||
recipients: null,
|
||||
message: { msg: 'hello world' },
|
||||
};
|
||||
service.publish(signal);
|
||||
expect(mockEventBroker.publish).toHaveBeenCalledWith({
|
||||
topic: 'signals',
|
||||
eventPayload: signal,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -37,14 +37,9 @@ export class DefaultSignalService implements SignalService {
|
||||
* @param message - message to publish
|
||||
*/
|
||||
async publish(signal: SignalPayload) {
|
||||
const { recipients, channel, message } = signal;
|
||||
await this.eventBroker?.publish({
|
||||
topic: 'signals',
|
||||
eventPayload: {
|
||||
recipients,
|
||||
message,
|
||||
channel,
|
||||
},
|
||||
eventPayload: signal,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,9 +16,9 @@
|
||||
import { SignalPayload } from './types';
|
||||
|
||||
/** @public */
|
||||
export type SignalService = {
|
||||
export interface SignalService {
|
||||
/**
|
||||
* Publishes a message to user refs to specific topic
|
||||
*/
|
||||
publish(signal: SignalPayload): Promise<void>;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ export type SignalServiceOptions = {
|
||||
|
||||
/** @public */
|
||||
export type SignalPayload = {
|
||||
recipients: string[] | null;
|
||||
recipients: string[] | string | null;
|
||||
channel: string;
|
||||
message: JsonObject;
|
||||
};
|
||||
|
||||
@@ -7,21 +7,27 @@ import { ApiRef } from '@backstage/core-plugin-api';
|
||||
import { JsonObject } from '@backstage/types';
|
||||
|
||||
// @public (undocumented)
|
||||
export type SignalApi = {
|
||||
export interface SignalApi {
|
||||
// (undocumented)
|
||||
subscribe(
|
||||
channel: string,
|
||||
onMessage: (message: JsonObject) => void,
|
||||
): {
|
||||
unsubscribe: () => void;
|
||||
};
|
||||
};
|
||||
): SignalSubscriber;
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export const signalApiRef: ApiRef<SignalApi>;
|
||||
|
||||
// @public (undocumented)
|
||||
export interface SignalSubscriber {
|
||||
// (undocumented)
|
||||
unsubscribe(): void;
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export const useSignal: (channel: string) => {
|
||||
lastSignal: JsonObject | null;
|
||||
isSignalsAvailable: boolean;
|
||||
};
|
||||
|
||||
// (No @packageDocumentation comment for this package)
|
||||
|
||||
@@ -22,9 +22,14 @@ export const signalApiRef = createApiRef<SignalApi>({
|
||||
});
|
||||
|
||||
/** @public */
|
||||
export type SignalApi = {
|
||||
export interface SignalSubscriber {
|
||||
unsubscribe(): void;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export interface SignalApi {
|
||||
subscribe(
|
||||
channel: string,
|
||||
onMessage: (message: JsonObject) => void,
|
||||
): { unsubscribe: () => void };
|
||||
};
|
||||
): SignalSubscriber;
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
import { signalApiRef } from '../api';
|
||||
import { useApiHolder } from '@backstage/core-plugin-api';
|
||||
import { JsonObject } from '@backstage/types';
|
||||
import { useEffect, useState } from 'react';
|
||||
import { useEffect, useMemo, useState } from 'react';
|
||||
|
||||
/** @public */
|
||||
export const useSignal = (channel: string) => {
|
||||
@@ -40,5 +40,8 @@ export const useSignal = (channel: string) => {
|
||||
};
|
||||
}, [signals, channel]);
|
||||
|
||||
return { lastSignal };
|
||||
// Can be used to fallback (for example to long polling) if signals are not available in the system
|
||||
const isSignalsAvailable = useMemo(() => !signals, [signals]);
|
||||
|
||||
return { lastSignal, isSignalsAvailable };
|
||||
};
|
||||
|
||||
@@ -146,20 +146,6 @@ export class SignalClient implements SignalApi {
|
||||
url.protocol = url.protocol === 'http:' ? 'ws:' : 'wss:';
|
||||
this.ws = new WebSocket(url.toString(), token);
|
||||
|
||||
this.ws.onmessage = (data: MessageEvent) => {
|
||||
this.handleMessage(data);
|
||||
};
|
||||
|
||||
this.ws.onerror = () => {
|
||||
this.reconnect();
|
||||
};
|
||||
|
||||
this.ws.onclose = (ev: CloseEvent) => {
|
||||
if (ev.code !== WS_CLOSE_NORMAL && ev.code !== WS_CLOSE_GOING_AWAY) {
|
||||
this.reconnect();
|
||||
}
|
||||
};
|
||||
|
||||
// Wait until connection is open
|
||||
let connectSleep = 0;
|
||||
while (
|
||||
@@ -174,6 +160,20 @@ export class SignalClient implements SignalApi {
|
||||
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
throw new Error('Connect timeout');
|
||||
}
|
||||
|
||||
this.ws.onmessage = (data: MessageEvent) => {
|
||||
this.handleMessage(data);
|
||||
};
|
||||
|
||||
this.ws.onerror = () => {
|
||||
this.reconnect();
|
||||
};
|
||||
|
||||
this.ws.onclose = (ev: CloseEvent) => {
|
||||
if (ev.code !== WS_CLOSE_NORMAL && ev.code !== WS_CLOSE_GOING_AWAY) {
|
||||
this.reconnect();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private handleMessage(data: MessageEvent) {
|
||||
|
||||
Reference in New Issue
Block a user