feat: support for broadcast notifications

the broadcast notifications are returned with the same queries as user
notifications with only few exceptions in the store

Signed-off-by: Heikki Hellgren <heikki.hellgren@op.fi>
This commit is contained in:
Heikki Hellgren
2024-03-13 10:12:20 +02:00
parent 739415b07c
commit ba14c0e4f2
8 changed files with 443 additions and 96 deletions
+7
View File
@@ -0,0 +1,7 @@
---
'@backstage/plugin-notifications-backend': patch
'@backstage/plugin-notifications-common': patch
'@backstage/plugin-notifications-node': patch
---
Support for broadcast notifications
@@ -0,0 +1,55 @@
/*
* Copyright 2024 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
exports.up = async function up(knex) {
await knex.schema.createTable('broadcast', table => {
table.uuid('id').primary();
table.string('title').notNullable();
table.text('description').nullable();
table.string('severity', 8).notNullable();
table.text('link').nullable();
table.string('origin', 255).notNullable();
table.string('scope', 255).nullable();
table.string('topic', 255).nullable();
table.datetime('created').defaultTo(knex.fn.now()).notNullable();
table.datetime('updated').nullable();
table.index(['scope', 'origin'], 'broadcast_cope_origin_idx');
});
await knex.schema.createTable('broadcast_user_status', table => {
table.uuid('broadcast_id').notNullable();
table.string('user', 255).notNullable();
table.datetime('read').nullable();
table.datetime('saved').nullable();
table
.foreign('broadcast_id')
.references(['id'])
.inTable('broadcast')
.onDelete('CASCADE');
table.unique(['broadcast_id', 'user'], {
indexName: 'broadcast_user_idx',
});
});
};
/**
* @param {import('knex').Knex} knex
*/
exports.down = async function down(knex) {
await knex.schema.dropTable('broadcast_user_status');
await knex.schema.dropTable('broadcast');
};
@@ -159,6 +159,7 @@ describe.each(databases.eachSupportedId())(
afterEach(async () => {
jest.resetAllMocks();
await knex('notification').del();
await knex('broadcast').del();
});
describe('saveNotification', () => {
@@ -184,19 +185,21 @@ describe.each(databases.eachSupportedId())(
it('should return all notifications for user', async () => {
await storage.saveNotification(testNotification1);
await storage.saveNotification(testNotification2);
await storage.saveBroadcast(testNotification3);
await storage.saveNotification(otherUserNotification);
const notifications = await storage.getNotifications({ user });
expect(notifications.map(idOnly)).toEqual([
/* default sorting from new to old */
id2,
id3,
id1,
]);
});
it('should return read notifications for user', async () => {
await storage.saveNotification(testNotification1);
await storage.saveNotification(testNotification2);
await storage.saveBroadcast(testNotification2);
await storage.saveNotification(testNotification3);
await storage.saveNotification(otherUserNotification);
@@ -211,7 +214,7 @@ describe.each(databases.eachSupportedId())(
it('should return unread notifications for user', async () => {
await storage.saveNotification(testNotification1);
await storage.saveNotification(testNotification2);
await storage.saveBroadcast(testNotification2);
await storage.saveNotification(testNotification3);
await storage.saveNotification(otherUserNotification);
@@ -227,7 +230,7 @@ describe.each(databases.eachSupportedId())(
it('should return both read and unread notifications for user', async () => {
await storage.saveNotification(testNotification1);
await storage.saveNotification(testNotification2);
await storage.saveBroadcast(testNotification2);
await storage.saveNotification(testNotification3);
await storage.markRead({ ids: [id1, id3], user });
@@ -241,7 +244,7 @@ describe.each(databases.eachSupportedId())(
it('should allow searching for notifications', async () => {
await storage.saveNotification(testNotification2);
await storage.saveNotification(testNotification1);
await storage.saveBroadcast(testNotification1);
await storage.saveNotification(otherUserNotification);
const notifications = await storage.getNotifications({
@@ -254,7 +257,7 @@ describe.each(databases.eachSupportedId())(
it('should filter notifications based on created date', async () => {
await storage.saveNotification(testNotification1);
await storage.saveNotification(testNotification2);
await storage.saveBroadcast(testNotification2);
await storage.saveNotification(otherUserNotification);
const notifications = await storage.getNotifications({
@@ -273,7 +276,7 @@ describe.each(databases.eachSupportedId())(
await storage.saveNotification(testNotification3);
await storage.saveNotification(testNotification4);
await storage.saveNotification(testNotification5);
await storage.saveNotification(testNotification6);
await storage.saveBroadcast(testNotification6);
await storage.saveNotification(testNotification7);
await storage.saveNotification(otherUserNotification);
});
@@ -333,7 +336,7 @@ describe.each(databases.eachSupportedId())(
describe('getNotifications sorting', () => {
beforeEach(async () => {
await storage.saveNotification(testNotification1);
await storage.saveNotification(testNotification2);
await storage.saveBroadcast(testNotification2);
await storage.saveNotification(testNotification3);
});
@@ -404,11 +407,13 @@ describe.each(databases.eachSupportedId())(
read: new Date(),
});
await storage.saveNotification(testNotification2);
await storage.saveBroadcast({ ...testNotification3, read: new Date() });
await storage.saveBroadcast(testNotification4);
await storage.saveNotification(otherUserNotification);
const status = await storage.getStatus({ user });
expect(status.read).toEqual(1);
expect(status.unread).toEqual(1);
expect(status.read).toEqual(2);
expect(status.unread).toEqual(2);
});
});
@@ -425,6 +430,17 @@ describe.each(databases.eachSupportedId())(
expect(existing).not.toBeNull();
expect(existing?.id).toEqual(id2);
});
it('should return existing scope broadcast', async () => {
await storage.saveBroadcast(testNotification1);
await storage.saveBroadcast(testNotification2);
const existing = await storage.getExistingScopeBroadcast({
origin: 'cd-origin',
scope: 'scaffolder-1234',
});
expect(existing).not.toBeNull();
expect(existing?.id).toEqual(id2);
});
});
describe('restoreExistingNotification', () => {
@@ -448,6 +464,27 @@ describe.each(databases.eachSupportedId())(
expect(existing?.payload.title).toEqual('New notification');
expect(existing?.read).toBeNull();
});
it('should return restore existing scope broadcast', async () => {
await storage.saveBroadcast(testNotification1);
await storage.saveBroadcast(testNotification2);
const existing = await storage.restoreExistingNotification({
id: id2,
notification: {
user: testNotification2.user,
payload: {
title: 'New notification',
link: '/scaffolder/task/1234',
severity: 'normal',
},
} as any,
});
expect(existing).not.toBeNull();
expect(existing?.id).toEqual(id2);
expect(existing?.payload.title).toEqual('New notification');
expect(existing?.read).toBeNull();
});
});
describe('getNotification', () => {
@@ -456,6 +493,13 @@ describe.each(databases.eachSupportedId())(
const notification = await storage.getNotification({ id: id1 });
expect(notification?.id).toEqual(id1);
});
it('should return broadcast by id', async () => {
await storage.saveNotification(testNotification1);
await storage.saveBroadcast(testNotification2);
const notification = await storage.getNotification({ id: id2 });
expect(notification?.id).toEqual(id2);
});
});
describe('markRead', () => {
@@ -467,6 +511,15 @@ describe.each(databases.eachSupportedId())(
const notification = await storage.getNotification({ id: id1 });
expect(notification?.read).not.toBeNull();
});
it('should mark broadcast read', async () => {
await storage.saveBroadcast(testNotification1);
const notificationBefore = await storage.getNotification({ id: id1 });
expect(notificationBefore?.read).toBeNull();
await storage.markRead({ ids: [id1], user });
const notification = await storage.getNotification({ id: id1 });
expect(notification?.read).not.toBeNull();
});
});
describe('markUnread', () => {
@@ -481,6 +534,18 @@ describe.each(databases.eachSupportedId())(
const notification = await storage.getNotification({ id: id1 });
expect(notification?.read).toBeNull();
});
it('should mark broadcast unread', async () => {
await storage.saveBroadcast({
...testNotification1,
read: new Date(),
});
const notificationBefore = await storage.getNotification({ id: id1 });
expect(notificationBefore?.read).not.toBeNull();
await storage.markUnread({ ids: [id1], user });
const notification = await storage.getNotification({ id: id1 });
expect(notification?.read).toBeNull();
});
});
describe('markSaved', () => {
@@ -492,6 +557,15 @@ describe.each(databases.eachSupportedId())(
const notification = await storage.getNotification({ id: id1 });
expect(notification?.saved).not.toBeNull();
});
it('should mark broadcast saved', async () => {
await storage.saveBroadcast(testNotification1);
const notificationBefore = await storage.getNotification({ id: id1 });
expect(notificationBefore?.saved).toBeNull();
await storage.markSaved({ ids: [id1], user });
const notification = await storage.getNotification({ id: id1 });
expect(notification?.saved).not.toBeNull();
});
});
describe('markUnsaved', () => {
@@ -506,6 +580,18 @@ describe.each(databases.eachSupportedId())(
const notification = await storage.getNotification({ id: id1 });
expect(notification?.saved).toBeNull();
});
it('should mark broadcast not saved', async () => {
await storage.saveBroadcast({
...testNotification1,
saved: new Date(),
});
const notificationBefore = await storage.getNotification({ id: id1 });
expect(notificationBefore?.saved).not.toBeNull();
await storage.markUnsaved({ ids: [id1], user });
const notification = await storage.getNotification({ id: id1 });
expect(notification?.saved).toBeNull();
});
});
},
);
@@ -30,9 +30,29 @@ const migrationsDir = resolvePackagePath(
'migrations',
);
const NOTIFICATION_COLUMNS = [
'id',
'title',
'description',
'severity',
'link',
'origin',
'scope',
'topic',
'created',
'updated',
'user',
'read',
'saved',
];
/** @internal */
export class DatabaseNotificationsStore implements NotificationsStore {
private constructor(private readonly db: Knex) {}
private isSQLite = false;
private constructor(private readonly db: Knex) {
this.isSQLite = this.db.client.config.client.includes('sqlite3');
}
static async create({
database,
@@ -94,13 +114,44 @@ export class DatabaseNotificationsStore implements NotificationsStore {
};
};
private mapBroadcastToDbRow = (notification: Notification) => {
return {
id: notification.id,
origin: notification.origin,
created: notification.created,
topic: notification.payload?.topic,
link: notification.payload?.link,
title: notification.payload?.title,
description: notification.payload?.description,
severity: notification.payload?.severity,
scope: notification.payload?.scope,
};
};
private getBroadcastUnion = () => {
return this.db('broadcast')
.leftJoin(
'broadcast_user_status',
'id',
'=',
'broadcast_user_status.broadcast_id',
)
.select(NOTIFICATION_COLUMNS);
};
private getNotificationsBaseQuery = (
options: NotificationGetOptions | NotificationModifyOptions,
) => {
const { user } = options;
const isSQLite = this.db.client.config.client.includes('sqlite3');
const query = this.db('notification').where('user', user);
const subQuery = this.db('notification')
.select(NOTIFICATION_COLUMNS)
.unionAll([this.getBroadcastUnion()])
.as('notifications');
const query = this.db.from(subQuery).where(q => {
q.where('user', user).orWhereNull('user');
});
if (options.sort !== undefined && options.sort !== null) {
query.orderBy(options.sort, options.sortOrder ?? 'desc');
@@ -109,18 +160,10 @@ export class DatabaseNotificationsStore implements NotificationsStore {
}
if (options.createdAfter) {
if (isSQLite) {
query.where(
'notification.created',
'>=',
options.createdAfter.valueOf(),
);
if (this.isSQLite) {
query.where('created', '>=', options.createdAfter.valueOf());
} else {
query.where(
'notification.created',
'>=',
options.createdAfter.toISOString(),
);
query.where('created', '>=', options.createdAfter.toISOString());
}
}
@@ -134,25 +177,25 @@ export class DatabaseNotificationsStore implements NotificationsStore {
if (options.search) {
query.whereRaw(
`(LOWER(notification.title) LIKE LOWER(?) OR LOWER(notification.description) LIKE LOWER(?))`,
`(LOWER(title) LIKE LOWER(?) OR LOWER(description) LIKE LOWER(?))`,
[`%${options.search}%`, `%${options.search}%`],
);
}
if (options.ids) {
query.whereIn('notification.id', options.ids);
query.whereIn('id', options.ids);
}
if (options.read) {
query.whereNotNull('notification.read');
query.whereNotNull('read');
} else if (options.read === false) {
query.whereNull('notification.read');
query.whereNull('read');
} // or match both if undefined
if (options.saved) {
query.whereNotNull('notification.saved');
query.whereNotNull('saved');
} else if (options.saved === false) {
query.whereNull('notification.saved');
query.whereNull('saved');
} // or match both if undefined
return query;
@@ -160,7 +203,7 @@ export class DatabaseNotificationsStore implements NotificationsStore {
async getNotifications(options: NotificationGetOptions) {
const notificationQuery = this.getNotificationsBaseQuery(options);
const notifications = await notificationQuery.select();
const notifications = await notificationQuery.select(NOTIFICATION_COLUMNS);
return this.mapToNotifications(notifications);
}
@@ -170,7 +213,7 @@ export class DatabaseNotificationsStore implements NotificationsStore {
countOptions.offset = undefined;
countOptions.sort = null;
const notificationQuery = this.getNotificationsBaseQuery(countOptions);
const response = await notificationQuery.count('* as CNT');
const response = await notificationQuery.count('id as CNT');
return Number(response[0].CNT);
}
@@ -180,6 +223,22 @@ export class DatabaseNotificationsStore implements NotificationsStore {
.into('notification');
}
async saveBroadcast(notification: Notification) {
await this.db
.insert(this.mapBroadcastToDbRow(notification))
.into('broadcast');
if (notification.saved || notification.read) {
await this.db
.insert({
user: notification.user,
broadcast_id: notification.id,
saved: notification.saved,
read: notification.read,
})
.into('broadcast_user_status');
}
}
async getStatus(options: NotificationGetOptions) {
const notificationQuery = this.getNotificationsBaseQuery({
...options,
@@ -215,7 +274,19 @@ export class DatabaseNotificationsStore implements NotificationsStore {
.where('user', options.user)
.where('scope', options.scope)
.where('origin', options.origin)
.select()
.limit(1);
const rows = await query;
if (!rows || rows.length === 0) {
return null;
}
return rows[0] as Notification;
}
async getExistingScopeBroadcast(options: { scope: string; origin: string }) {
const query = this.db('broadcast')
.where('scope', options.scope)
.where('origin', options.origin)
.limit(1);
const rows = await query;
@@ -229,11 +300,7 @@ export class DatabaseNotificationsStore implements NotificationsStore {
id: string;
notification: Notification;
}) {
const query = this.db('notification')
.where('id', options.id)
.where('user', options.notification.user);
await query.update({
const updateColumns = {
title: options.notification.payload.title,
description: options.notification.payload.description,
link: options.notification.payload.link,
@@ -241,15 +308,31 @@ export class DatabaseNotificationsStore implements NotificationsStore {
updated: new Date(),
severity: options.notification.payload.severity,
read: null,
});
};
const notificationQuery = this.db('notification')
.where('id', options.id)
.where('user', options.notification.user);
const broadcastQuery = this.db('broadcast').where('id', options.id);
await Promise.all([
notificationQuery.update(updateColumns),
broadcastQuery.update({ ...updateColumns, read: undefined }),
]);
return await this.getNotification(options);
}
async getNotification(options: { id: string }): Promise<Notification | null> {
const rows = await this.db('notification')
const rows = await this.db
.select('*')
.from(
this.db('notification')
.select(NOTIFICATION_COLUMNS)
.unionAll([this.getBroadcastUnion()])
.as('notifications'),
)
.where('id', options.id)
.select()
.limit(1);
if (!rows || rows.length === 0) {
return null;
@@ -257,23 +340,65 @@ export class DatabaseNotificationsStore implements NotificationsStore {
return this.mapToNotifications(rows)[0];
}
private markReadSaved = async (
ids: string[],
user: string,
read?: Date | null,
saved?: Date | null,
) => {
await this.db('notification')
.whereIn('id', ids)
.where('user', user)
.update({ read, saved });
const broadcasts = this.mapToNotifications(
await this.db('broadcast').whereIn('id', ids).select(),
);
if (broadcasts.length > 0)
if (!this.isSQLite) {
await this.db('broadcast_user_status')
.insert(
broadcasts.map(b => ({
broadcast_id: b.id,
user,
read,
saved,
})),
)
.onConflict(['broadcast_id', 'user'])
.merge(['read', 'saved']);
} else {
// SQLite does not support upsert so fall back to this (mostly for tests and local dev)
for (const b of broadcasts) {
const baseQuery = this.db('broadcast_user_status')
.where('broadcast_id', b.id)
.where('user', user);
const exists = await baseQuery.clone().limit(1).select().first();
if (exists) {
await baseQuery.clone().update({ read, saved });
} else {
await baseQuery
.clone()
.insert({ broadcast_id: b.id, user, read, saved });
}
}
}
};
async markRead(options: NotificationModifyOptions): Promise<void> {
const notificationQuery = this.getNotificationsBaseQuery(options);
await notificationQuery.update({ read: new Date() });
await this.markReadSaved(options.ids, options.user, new Date(), undefined);
}
async markUnread(options: NotificationModifyOptions): Promise<void> {
const notificationQuery = this.getNotificationsBaseQuery(options);
await notificationQuery.update({ read: null });
await this.markReadSaved(options.ids, options.user, null, undefined);
}
async markSaved(options: NotificationModifyOptions): Promise<void> {
const notificationQuery = this.getNotificationsBaseQuery(options);
await notificationQuery.update({ saved: new Date() });
await this.markReadSaved(options.ids, options.user, undefined, new Date());
}
async markUnsaved(options: NotificationModifyOptions): Promise<void> {
const notificationQuery = this.getNotificationsBaseQuery(options);
await notificationQuery.update({ saved: null });
await this.markReadSaved(options.ids, options.user, undefined, null);
}
}
@@ -46,12 +46,19 @@ export interface NotificationsStore {
saveNotification(notification: Notification): Promise<void>;
saveBroadcast(notification: Notification): Promise<void>;
getExistingScopeNotification(options: {
user: string;
scope: string;
origin: string;
}): Promise<Notification | null>;
getExistingScopeBroadcast(options: {
scope: string;
origin: string;
}): Promise<Notification | null>;
restoreExistingNotification(options: {
id: string;
notification: Notification;
@@ -114,7 +114,6 @@ export async function createRouter(
targetPluginId: 'catalog',
});
// TODO: Support for broadcast
if (entityRef === null) {
return [];
}
@@ -174,7 +173,7 @@ export async function createRouter(
};
const decorateNotification = async (notification: Notification) => {
let ret: Notification = notification;
let ret = notification;
for (const processor of processors ?? []) {
ret = processor.decorate ? await processor.decorate(ret) : ret;
}
@@ -308,43 +307,59 @@ export async function createRouter(
res.status(200).send(notifications);
});
// Add new notification
router.post('/', async (req, res) => {
const { recipients, payload } = req.body;
const notifications = [];
let users = [];
const credentials = await httpAuth.credentials(req, { allow: ['service'] });
const { title, scope } = payload;
if (!recipients || !title) {
logger.error(`Invalid notification request received`);
throw new InputError();
}
let entityRef = null;
// TODO: Support for broadcast notifications
if (recipients.entityRef && recipients.type === 'entity') {
entityRef = recipients.entityRef;
}
try {
users = await getUsersForEntityRef(entityRef);
} catch (e) {
throw new InputError();
}
const origin = credentials.principal.subject;
const baseNotification: Omit<Notification, 'id' | 'user'> = {
payload: {
...payload,
severity: payload.severity ?? 'normal',
},
origin,
created: new Date(),
const sendBroadcastNotification = async (
baseNotification: Omit<Notification, 'user' | 'id'>,
opts: { scope?: string; origin: string },
) => {
const { scope, origin } = opts;
const broadcastNotification = {
...baseNotification,
id: uuid(),
};
const notification = await decorateNotification({
...broadcastNotification,
user: '',
});
let existingNotification;
if (scope) {
existingNotification = await store.getExistingScopeBroadcast({
scope,
origin,
});
}
let ret = notification;
if (existingNotification) {
const restored = await store.restoreExistingNotification({
id: existingNotification.id,
notification: { ...notification, user: '' },
});
ret = restored ?? notification;
} else {
await store.saveBroadcast(notification);
}
processorSendNotification(ret);
if (signals) {
await signals.publish<NewNotificationSignal>({
recipients: null,
message: {
action: 'new_notification',
notification_id: ret.id,
},
channel: 'notifications',
});
}
return notification;
};
const sendUserNotifications = async (
baseNotification: Omit<Notification, 'user' | 'id'>,
users: string[],
opts: { scope?: string; origin: string },
) => {
const notifications = [];
const { scope, origin } = opts;
const uniqueUsers = [...new Set(users)];
for (const user of uniqueUsers) {
const userNotification = {
@@ -388,6 +403,55 @@ export async function createRouter(
});
}
}
return notifications;
};
// Add new notification
router.post('/', async (req, res) => {
const { recipients, payload } = req.body;
const notifications = [];
let users = [];
const credentials = await httpAuth.credentials(req, { allow: ['service'] });
const { title, scope } = payload;
if (!recipients || !title) {
logger.error(`Invalid notification request received`);
throw new InputError();
}
const origin = credentials.principal.subject;
const baseNotification = {
payload: {
...payload,
severity: payload.severity ?? 'normal',
},
origin,
created: new Date(),
};
if (recipients.type === 'broadcast') {
const broadcast = await sendBroadcastNotification(baseNotification, {
scope,
origin,
});
notifications.push(broadcast);
} else {
const entityRef = recipients.entityRef;
try {
users = await getUsersForEntityRef(entityRef);
} catch (e) {
throw new InputError();
}
const userNotifications = await sendUserNotifications(
baseNotification,
users,
{ scope, origin },
);
notifications.push(...userNotifications);
}
res.json(notifications);
});
+8 -4
View File
@@ -27,10 +27,14 @@ export interface NotificationProcessor {
}
// @public (undocumented)
export type NotificationRecipients = {
type: 'entity';
entityRef: string | string[];
};
export type NotificationRecipients =
| {
type: 'entity';
entityRef: string | string[];
}
| {
type: 'broadcast';
};
// @public (undocumented)
export type NotificationSendOptions = {
@@ -25,13 +25,12 @@ export type NotificationServiceOptions = {
};
/** @public */
export type NotificationRecipients = {
type: 'entity';
entityRef: string | string[];
};
// TODO: Support for broadcast messages
// | { type: 'broadcast' };
export type NotificationRecipients =
| {
type: 'entity';
entityRef: string | string[];
}
| { type: 'broadcast' };
/** @public */
export type NotificationSendOptions = {