feat(notifications-slack): add scope-based message update support

When a Backstage notification is re-sent with the same scope, the
notifications backend updates the existing DB record and sets
notification.updated. Previously, the SlackNotificationProcessor always
called chat.postMessage(), creating duplicate Slack messages.

This adds database-backed scope-based update support:

- New slack_message_timestamps table to persist Slack message ts values
  keyed by (scope, channel)
- After each chat.postMessage(), store the response ts in the database
- When postProcess receives a notification with updated set and a
  matching stored ts, use chat.update() instead of chat.postMessage()
- Scope context is passed as parameters through the call chain to avoid
  race conditions with concurrent postProcess calls
- Scheduled daily cleanup of old timestamp records (24h retention)
- New messagesUpdated metrics counter for observability
- Graceful degradation when no database is provided
- Explicitly picks only supported fields for chat.update calls

Signed-off-by: Erik Miller <erik.miller@gusto.com>
This commit is contained in:
Erik Miller
2026-03-27 16:53:12 -07:00
parent e32a4cabb4
commit f399a7acab
8 changed files with 650 additions and 22 deletions
@@ -0,0 +1,5 @@
---
'@backstage/plugin-notifications-backend-module-slack': patch
---
Added scope-based message update support. When a notification is re-sent with the same `scope` and `notification.updated` is set, the processor now calls `chat.update()` on the existing Slack message instead of sending a duplicate via `chat.postMessage()`. Message timestamps are persisted in a new `slack_message_timestamps` database table with automatic daily cleanup. The processor gracefully degrades to the previous behavior when no database is provided.
@@ -0,0 +1,44 @@
// @ts-check
/*
* Copyright 2025 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @param {import('knex').Knex} knex
*/
exports.up = async function up(knex) {
await knex.schema.createTable(
'slack_message_timestamps',
function tableSetup(table) {
table.string('origin', 256).notNullable();
table.string('scope', 512).notNullable();
table.string('channel', 64).notNullable();
table.string('ts', 64).notNullable();
table
.timestamp('created_at', { useTz: true })
.defaultTo(knex.fn.now())
.notNullable();
table.primary(['origin', 'scope', 'channel']);
table.index('created_at', 'idx_slack_message_timestamps_created_at');
},
);
};
/**
* @param {import('knex').Knex} knex
*/
exports.down = async function down(knex) {
await knex.schema.dropTable('slack_message_timestamps');
};
@@ -22,7 +22,8 @@
"types": "src/index.ts",
"files": [
"dist",
"config.d.ts"
"config.d.ts",
"migrations"
],
"scripts": {
"build": "backstage-cli package build",
@@ -46,6 +47,7 @@
"@slack/types": "^2.14.0",
"@slack/web-api": "^7.5.0",
"dataloader": "^2.0.0",
"knex": "^3.0.0",
"p-throttle": "^4.1.1"
},
"devDependencies": {
@@ -0,0 +1,18 @@
## SQL Report file for "@backstage/plugin-notifications-backend-module-slack"
> Do not edit this file. It is a report generated by `yarn build:api-reports`
## Table `slack_message_timestamps`
| Column | Type | Nullable | Max Length | Default |
| ------------ | -------------------------- | -------- | ---------- | ------------------- |
| `channel` | `character varying` | false | 64 | - |
| `created_at` | `timestamp with time zone` | false | - | `CURRENT_TIMESTAMP` |
| `origin` | `character varying` | false | 256 | - |
| `scope` | `character varying` | false | 512 | - |
| `ts` | `character varying` | false | 64 | - |
### Indices
- `idx_slack_message_timestamps_created_at` (`created_at`)
- `slack_message_timestamps_pkey` (`origin`, `scope`, `channel`) unique primary
@@ -20,6 +20,7 @@ import { SlackNotificationProcessor } from './SlackNotificationProcessor';
import { catalogServiceMock } from '@backstage/plugin-catalog-node/testUtils';
import { KnownBlock, WebClient } from '@slack/web-api';
import { Entity } from '@backstage/catalog-model';
import { Knex } from 'knex';
import pThrottle from 'p-throttle';
import { durationToMilliseconds } from '@backstage/types';
@@ -45,6 +46,11 @@ jest.mock('@slack/web-api', () => {
ts: '1234567890.123456',
channel: 'C12345678',
})),
update: jest.fn(() => ({
ok: true,
ts: '1234567890.123456',
channel: 'C12345678',
})),
},
conversations: {
list: jest.fn(() => ({
@@ -1615,4 +1621,381 @@ describe('SlackNotificationProcessor', () => {
]);
});
});
describe('scope-based message updates', () => {
function createMockDb() {
const store = new Map<
string,
{
origin: string;
scope: string;
channel: string;
ts: string;
created_at: Date;
}
>();
function storeKey(origin: string, scope: string, channel: string) {
return `${origin}:${scope}:${channel}`;
}
// Each call to db() creates a fresh query builder that tracks its own
// chained .where()/.insert() arguments, avoiding cross-call interference.
function createQueryBuilder(): any {
let lastWhereArgs: any;
let lastInsertRow: any;
const qb: any = {
where: jest.fn().mockImplementation((args: any) => {
lastWhereArgs = args;
return qb;
}),
first: jest.fn().mockImplementation(() => {
if (lastWhereArgs) {
const key = storeKey(
lastWhereArgs.origin,
lastWhereArgs.scope,
lastWhereArgs.channel,
);
return Promise.resolve(store.get(key));
}
return Promise.resolve(undefined);
}),
insert: jest.fn().mockImplementation((row: any) => {
lastInsertRow = row;
store.set(storeKey(row.origin, row.scope, row.channel), row);
return qb;
}),
onConflict: jest.fn().mockReturnThis(),
merge: jest.fn().mockImplementation((row: any) => {
if (lastInsertRow) {
const key = storeKey(
lastInsertRow.origin,
lastInsertRow.scope,
lastInsertRow.channel,
);
const existing = store.get(key);
if (existing) {
store.set(key, { ...existing, ...row });
}
}
return Promise.resolve();
}),
delete: jest.fn().mockResolvedValue(0),
};
return qb;
}
const db = jest
.fn()
.mockImplementation(() => createQueryBuilder()) as unknown as Knex;
return { db, store, storeKey };
}
function createProcessorWithDb(
slack: WebClient,
db: Knex,
): SlackNotificationProcessor {
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
}),
metrics,
slack,
})[0];
processor.setDatabase(db);
return processor;
}
it('should store the message timestamp after initial scoped send', async () => {
const slack = new WebClient();
const { db, store, storeKey } = createMockDb();
const processor = createProcessorWithDb(slack, db);
await processor.postProcess(
{
origin: 'plugin',
id: '1234',
user: 'user:default/mock',
created: new Date(),
payload: {
title: 'notification',
scope: 'deployment-failure/my-service/42',
},
},
{
recipients: { type: 'entity', entityRef: 'user:default/mock' },
payload: {
title: 'notification',
scope: 'deployment-failure/my-service/42',
},
},
);
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
expect(store.size).toBe(1);
const key = storeKey(
'plugin',
'deployment-failure/my-service/42',
'U12345678',
);
expect(store.get(key)).toEqual(
expect.objectContaining({
origin: 'plugin',
scope: 'deployment-failure/my-service/42',
channel: 'U12345678',
ts: '1234567890.123456',
}),
);
});
it('should use chat.update when the notification has been updated and a stored ts exists', async () => {
const slack = new WebClient();
const { db, store, storeKey } = createMockDb();
// Pre-populate the store with a previously sent message.
store.set(
storeKey('plugin', 'deployment-failure/my-service/42', 'U12345678'),
{
origin: 'plugin',
scope: 'deployment-failure/my-service/42',
channel: 'U12345678',
ts: '1111111111.111111',
created_at: new Date(),
},
);
const processor = createProcessorWithDb(slack, db);
await processor.postProcess(
{
origin: 'plugin',
id: '1234',
user: 'user:default/mock',
created: new Date(),
updated: new Date(),
payload: {
title: 'notification',
description: 'Updated with analysis',
scope: 'deployment-failure/my-service/42',
},
},
{
recipients: { type: 'entity', entityRef: 'user:default/mock' },
payload: {
title: 'notification',
description: 'Updated with analysis',
scope: 'deployment-failure/my-service/42',
},
},
);
expect(slack.chat.postMessage).not.toHaveBeenCalled();
expect(slack.chat.update).toHaveBeenCalledTimes(1);
// Verify chat.update receives only the fields it supports (channel, ts,
// text, blocks, attachments) and not the full ChatPostMessageArguments.
const updateArgs = (slack.chat.update as jest.Mock).mock.calls[0][0];
const updateKeys = Object.keys(updateArgs).sort();
expect(updateKeys).toEqual(['attachments', 'channel', 'text', 'ts']);
expect(updateArgs.channel).toBe('U12345678');
expect(updateArgs.ts).toBe('1111111111.111111');
});
it('should fall back to chat.postMessage when notification is updated but no stored ts exists', async () => {
const slack = new WebClient();
const { db } = createMockDb();
const processor = createProcessorWithDb(slack, db);
await processor.postProcess(
{
origin: 'plugin',
id: '1234',
user: 'user:default/mock',
created: new Date(),
updated: new Date(),
payload: {
title: 'notification',
scope: 'deployment-failure/my-service/42',
},
},
{
recipients: { type: 'entity', entityRef: 'user:default/mock' },
payload: {
title: 'notification',
scope: 'deployment-failure/my-service/42',
},
},
);
// Should fall back to postMessage since there is no stored ts.
expect(slack.chat.update).not.toHaveBeenCalled();
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
});
it('should not interact with the database for non-scoped notifications', async () => {
const slack = new WebClient();
const { db } = createMockDb();
const processor = createProcessorWithDb(slack, db);
await processor.postProcess(
{
origin: 'plugin',
id: '1234',
user: 'user:default/mock',
created: new Date(),
payload: {
title: 'notification',
},
},
{
recipients: { type: 'entity', entityRef: 'user:default/mock' },
payload: { title: 'notification' },
},
);
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
// The db function should not have been called since there is no scope.
expect(db).not.toHaveBeenCalled();
});
it('should work without a database (graceful degradation)', async () => {
const slack = new WebClient();
// No db set — the processor should still send messages normally.
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
}),
metrics,
slack,
})[0];
await processor.postProcess(
{
origin: 'plugin',
id: '1234',
user: 'user:default/mock',
created: new Date(),
payload: {
title: 'notification',
scope: 'deployment-failure/my-service/42',
},
},
{
recipients: { type: 'entity', entityRef: 'user:default/mock' },
payload: {
title: 'notification',
scope: 'deployment-failure/my-service/42',
},
},
);
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
expect(slack.chat.update).not.toHaveBeenCalled();
});
it('should handle concurrent postProcess calls with different scopes correctly', async () => {
const slack = new WebClient();
const { db, store, storeKey } = createMockDb();
// Pre-populate stored timestamps for both scopes.
store.set(storeKey('plugin', 'scope-a', 'U12345678'), {
origin: 'plugin',
scope: 'scope-a',
channel: 'U12345678',
ts: '1111111111.111111',
created_at: new Date(),
});
store.set(storeKey('plugin', 'scope-b', 'U12345678'), {
origin: 'plugin',
scope: 'scope-b',
channel: 'U12345678',
ts: '2222222222.222222',
created_at: new Date(),
});
const processor = createProcessorWithDb(slack, db);
// Fire both postProcess calls concurrently with different scopes.
await Promise.all([
processor.postProcess(
{
origin: 'plugin',
id: '1',
user: 'user:default/mock',
created: new Date(),
updated: new Date(),
payload: { title: 'A', scope: 'scope-a' },
},
{
recipients: { type: 'entity', entityRef: 'user:default/mock' },
payload: { title: 'A', scope: 'scope-a' },
},
),
processor.postProcess(
{
origin: 'plugin',
id: '2',
user: 'user:default/mock',
created: new Date(),
updated: new Date(),
payload: { title: 'B', scope: 'scope-b' },
},
{
recipients: { type: 'entity', entityRef: 'user:default/mock' },
payload: { title: 'B', scope: 'scope-b' },
},
),
]);
// Both should use chat.update, not postMessage.
expect(slack.chat.postMessage).not.toHaveBeenCalled();
expect(slack.chat.update).toHaveBeenCalledTimes(2);
// Each update should use the correct ts for its scope.
const updateCalls = (slack.chat.update as jest.Mock).mock.calls;
const timestamps = updateCalls.map((call: any[]) => call[0].ts).sort();
expect(timestamps).toEqual(['1111111111.111111', '2222222222.222222']);
});
it('should not collide across different origins with the same scope', async () => {
const slack = new WebClient();
const { db, store, storeKey } = createMockDb();
// Pre-populate a stored timestamp for origin-a only.
store.set(storeKey('origin-a', 'shared-scope', 'U12345678'), {
origin: 'origin-a',
scope: 'shared-scope',
channel: 'U12345678',
ts: '1111111111.111111',
created_at: new Date(),
});
const processor = createProcessorWithDb(slack, db);
// Send an update from origin-b with the same scope — should NOT find
// the stored ts from origin-a, and should fall back to postMessage.
await processor.postProcess(
{
origin: 'origin-b',
id: '1',
user: 'user:default/mock',
created: new Date(),
updated: new Date(),
payload: { title: 'From B', scope: 'shared-scope' },
},
{
recipients: { type: 'entity', entityRef: 'user:default/mock' },
payload: { title: 'From B', scope: 'shared-scope' },
},
);
expect(slack.chat.update).not.toHaveBeenCalled();
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
});
});
});
@@ -34,8 +34,13 @@ import {
NotificationSendOptions,
} from '@backstage/plugin-notifications-node';
import { durationToMilliseconds } from '@backstage/types';
import { ChatPostMessageArguments, WebClient } from '@slack/web-api';
import {
ChatPostMessageArguments,
ChatUpdateArguments,
WebClient,
} from '@slack/web-api';
import DataLoader from 'dataloader';
import { Knex } from 'knex';
import pThrottle from 'p-throttle';
import { ANNOTATION_SLACK_BOT_NOTIFY } from './constants';
import { BroadcastRoute } from './types';
@@ -43,6 +48,12 @@ import { ExpiryMap, toChatPostMessageArgs } from './util';
import { CatalogService } from '@backstage/plugin-catalog-node';
import { SlackBlockKitRenderer } from '../extensions';
interface ScopeContext {
origin: string;
scope?: string;
isUpdate?: boolean;
}
export class SlackNotificationProcessor implements NotificationProcessor {
private readonly logger: LoggerService;
private readonly catalog: CatalogService;
@@ -50,9 +61,12 @@ export class SlackNotificationProcessor implements NotificationProcessor {
private readonly slack: WebClient;
private readonly sendNotifications: (
opts: ChatPostMessageArguments[],
scopeContext?: ScopeContext,
) => Promise<void>;
private readonly messagesSent: MetricsServiceCounter;
private readonly messagesFailed: MetricsServiceCounter;
private readonly messagesUpdated: MetricsServiceCounter;
private db?: Knex;
private readonly broadcastChannels?: string[];
private readonly broadcastRoutes?: BroadcastRoute[];
private readonly entityLoader: DataLoader<string, Entity | undefined>;
@@ -179,25 +193,42 @@ export class SlackNotificationProcessor implements NotificationProcessor {
unit: '{message}',
},
);
this.messagesUpdated = metrics.createCounter(
'notifications.processors.slack.update.count',
{
description:
'Number of existing Slack messages updated via scope matching',
unit: '{message}',
},
);
const throttle = pThrottle({
limit: this.concurrencyLimit,
interval: this.throttleInterval,
});
const throttled = throttle((opts: ChatPostMessageArguments) =>
this.sendNotification(opts),
const throttled = throttle(
(opts: ChatPostMessageArguments, ctx?: ScopeContext) =>
this.sendNotification(opts, ctx),
);
this.sendNotifications = async (opts: ChatPostMessageArguments[]) => {
this.sendNotifications = async (
opts: ChatPostMessageArguments[],
scopeContext?: ScopeContext,
) => {
const results = await Promise.allSettled(
opts.map(message => throttled(message)),
opts.map(message => throttled(message, scopeContext)),
);
let successCount = 0;
let sentCount = 0;
let updateCount = 0;
let failureCount = 0;
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
successCount++;
if (result.value === 'updated') {
updateCount++;
} else {
sentCount++;
}
} else {
this.logger.error(
`Failed to send Slack channel notification to ${opts[index].channel}: ${result.reason.message}`,
@@ -206,11 +237,16 @@ export class SlackNotificationProcessor implements NotificationProcessor {
}
});
this.messagesSent.add(successCount);
this.messagesSent.add(sentCount);
this.messagesUpdated.add(updateCount);
this.messagesFailed.add(failureCount);
};
}
setDatabase(db: Knex): void {
this.db = db;
}
getName(): string {
return 'SlackNotificationProcessor';
}
@@ -337,8 +373,11 @@ export class SlackNotificationProcessor implements NotificationProcessor {
this.logger.debug(`Sending notification: ${JSON.stringify(payload)}`);
});
// Send notifications
await this.sendNotifications(outbound);
await this.sendNotifications(outbound, {
origin: notification.origin,
scope: notification.payload.scope,
isUpdate: !!notification.updated,
});
}
private async formatPayloadDescriptionForSlack(
@@ -434,12 +473,92 @@ export class SlackNotificationProcessor implements NotificationProcessor {
}
}
async sendNotification(args: ChatPostMessageArguments): Promise<void> {
async sendNotification(
args: ChatPostMessageArguments,
scopeContext?: ScopeContext,
): Promise<'sent' | 'updated'> {
const channel = args.channel as string;
const scope = scopeContext?.scope;
// If this is a scoped update, try to update the existing Slack message.
const origin = scopeContext?.origin;
if (scopeContext?.isUpdate && origin && scope && this.db) {
const storedTs = await this.getStoredTimestamp(origin, scope, channel);
if (storedTs) {
const updateArgs = {
channel,
ts: storedTs,
...('text' in args ? { text: args.text } : {}),
...('blocks' in args ? { blocks: args.blocks } : {}),
...('attachments' in args ? { attachments: args.attachments } : {}),
} as ChatUpdateArguments;
const updateResponse = await this.slack.chat.update(updateArgs);
if (!updateResponse.ok) {
throw new Error(
`Failed to update notification: ${updateResponse.error}`,
);
}
return 'updated';
}
}
// Send a new message.
const response = await this.slack.chat.postMessage(args);
if (!response.ok) {
throw new Error(`Failed to send notification: ${response.error}`);
}
// Persist the message timestamp for future scope-based updates.
if (origin && scope && response.ts && this.db) {
await this.saveTimestamp(origin, scope, channel, response.ts);
}
return 'sent';
}
private async getStoredTimestamp(
origin: string,
scope: string,
channel: string,
): Promise<string | undefined> {
try {
const row = await this.db!('slack_message_timestamps')
.where({ origin, scope, channel })
.first();
return row?.ts;
} catch (error) {
this.logger.warn('Failed to look up stored Slack message timestamp', {
origin,
scope,
channel,
error,
});
return undefined;
}
}
private async saveTimestamp(
origin: string,
scope: string,
channel: string,
ts: string,
): Promise<void> {
try {
await this.db!('slack_message_timestamps')
.insert({ origin, scope, channel, ts, created_at: new Date() })
.onConflict(['origin', 'scope', 'channel'])
.merge({ ts, created_at: new Date() });
} catch (error) {
this.logger.warn('Failed to persist Slack message timestamp', {
origin,
scope,
channel,
error,
});
}
}
private static parseBroadcastRoute(route: Config): BroadcastRoute {
@@ -16,6 +16,7 @@
import {
coreServices,
createBackendModule,
resolvePackagePath,
} from '@backstage/backend-plugin-api';
import { metricsServiceRef } from '@backstage/backend-plugin-api/alpha';
import { notificationsProcessingExtensionPoint } from '@backstage/plugin-notifications-node';
@@ -26,6 +27,13 @@ import {
SlackBlockKitRenderer,
} from './extensions';
const MIGRATIONS_DIR = resolvePackagePath(
'@backstage/plugin-notifications-backend-module-slack',
'migrations',
);
const CLEANUP_RETENTION_MS = 24 * 60 * 60 * 1000; // 24 hours
/**
* The Slack notification processor for use with the notifications plugin.
* This allows sending of notifications via Slack DMs or to channels.
@@ -54,17 +62,65 @@ export const notificationsModuleSlack = createBackendModule({
catalog: catalogServiceRef,
notifications: notificationsProcessingExtensionPoint,
metrics: metricsServiceRef,
database: coreServices.database,
scheduler: coreServices.scheduler,
},
async init({ auth, config, logger, catalog, notifications, metrics }) {
notifications.addProcessor(
SlackNotificationProcessor.fromConfig(config, {
auth,
logger,
catalog,
metrics,
blockKitRenderer,
}),
);
async init({
auth,
config,
logger,
catalog,
notifications,
metrics,
database,
scheduler,
}) {
const processors = SlackNotificationProcessor.fromConfig(config, {
auth,
logger,
catalog,
metrics,
blockKitRenderer,
});
if (processors.length === 0) {
return;
}
const db = await database.getClient();
if (!database.migrations?.skip) {
await db.migrate.latest({
directory: MIGRATIONS_DIR,
});
}
// Attach the DB to each processor now that migrations have run.
for (const processor of processors) {
processor.setDatabase(db);
}
notifications.addProcessor(processors);
// Clean up old message timestamp records daily. These records are only
// needed for the short window between initial send and scope-based
// update (typically minutes), so a 24-hour retention is sufficient.
await scheduler.scheduleTask({
id: 'slack-message-timestamps-cleanup',
frequency: { hours: 24 },
timeout: { minutes: 5 },
initialDelay: { hours: 2 },
scope: 'global',
fn: async () => {
const cutoff = new Date(Date.now() - CLEANUP_RETENTION_MS);
const deleted = await db('slack_message_timestamps')
.where('created_at', '<=', cutoff)
.delete();
logger.info('Cleaned up old Slack message timestamps', {
deleted,
});
},
});
},
});
},
+1
View File
@@ -6195,6 +6195,7 @@ __metadata:
"@slack/types": "npm:^2.14.0"
"@slack/web-api": "npm:^7.5.0"
dataloader: "npm:^2.0.0"
knex: "npm:^3.0.0"
msw: "npm:^2.0.0"
p-throttle: "npm:^4.1.1"
languageName: unknown