diff --git a/.changeset/slack-scope-message-updates.md b/.changeset/slack-scope-message-updates.md new file mode 100644 index 0000000000..0bea25c1d8 --- /dev/null +++ b/.changeset/slack-scope-message-updates.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-notifications-backend-module-slack': patch +--- + +Added scope-based message update support. When a notification is re-sent with the same `scope` and `notification.updated` is set, the processor now calls `chat.update()` on the existing Slack message instead of sending a duplicate via `chat.postMessage()`. Message timestamps are persisted in a new `slack_message_timestamps` database table with automatic daily cleanup. The processor gracefully degrades to the previous behavior when no database is provided. diff --git a/plugins/notifications-backend-module-slack/migrations/20260327000000_create_slack_message_timestamps.js b/plugins/notifications-backend-module-slack/migrations/20260327000000_create_slack_message_timestamps.js new file mode 100644 index 0000000000..6bddab3e54 --- /dev/null +++ b/plugins/notifications-backend-module-slack/migrations/20260327000000_create_slack_message_timestamps.js @@ -0,0 +1,44 @@ +// @ts-check +/* + * Copyright 2025 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @param {import('knex').Knex} knex + */ +exports.up = async function up(knex) { + await knex.schema.createTable( + 'slack_message_timestamps', + function tableSetup(table) { + table.string('origin', 256).notNullable(); + table.string('scope', 512).notNullable(); + table.string('channel', 64).notNullable(); + table.string('ts', 64).notNullable(); + table + .timestamp('created_at', { useTz: true }) + .defaultTo(knex.fn.now()) + .notNullable(); + table.primary(['origin', 'scope', 'channel']); + table.index('created_at', 'idx_slack_message_timestamps_created_at'); + }, + ); +}; + +/** + * @param {import('knex').Knex} knex + */ +exports.down = async function down(knex) { + await knex.schema.dropTable('slack_message_timestamps'); +}; diff --git a/plugins/notifications-backend-module-slack/package.json b/plugins/notifications-backend-module-slack/package.json index 5e4ff43bf6..8708d9e77a 100644 --- a/plugins/notifications-backend-module-slack/package.json +++ b/plugins/notifications-backend-module-slack/package.json @@ -22,7 +22,8 @@ "types": "src/index.ts", "files": [ "dist", - "config.d.ts" + "config.d.ts", + "migrations" ], "scripts": { "build": "backstage-cli package build", @@ -46,6 +47,7 @@ "@slack/types": "^2.14.0", "@slack/web-api": "^7.5.0", "dataloader": "^2.0.0", + "knex": "^3.0.0", "p-throttle": "^4.1.1" }, "devDependencies": { diff --git a/plugins/notifications-backend-module-slack/report.sql.md b/plugins/notifications-backend-module-slack/report.sql.md new file mode 100644 index 0000000000..daff159f33 --- /dev/null +++ b/plugins/notifications-backend-module-slack/report.sql.md @@ -0,0 +1,18 @@ +## SQL Report file for "@backstage/plugin-notifications-backend-module-slack" + +> Do not edit this file. It is a report generated by `yarn build:api-reports` + +## Table `slack_message_timestamps` + +| Column | Type | Nullable | Max Length | Default | +| ------------ | -------------------------- | -------- | ---------- | ------------------- | +| `channel` | `character varying` | false | 64 | - | +| `created_at` | `timestamp with time zone` | false | - | `CURRENT_TIMESTAMP` | +| `origin` | `character varying` | false | 256 | - | +| `scope` | `character varying` | false | 512 | - | +| `ts` | `character varying` | false | 64 | - | + +### Indices + +- `idx_slack_message_timestamps_created_at` (`created_at`) +- `slack_message_timestamps_pkey` (`origin`, `scope`, `channel`) unique primary diff --git a/plugins/notifications-backend-module-slack/src/lib/SlackNotificationProcessor.test.ts b/plugins/notifications-backend-module-slack/src/lib/SlackNotificationProcessor.test.ts index 0393070b3f..ddf34b6b89 100644 --- a/plugins/notifications-backend-module-slack/src/lib/SlackNotificationProcessor.test.ts +++ b/plugins/notifications-backend-module-slack/src/lib/SlackNotificationProcessor.test.ts @@ -20,6 +20,7 @@ import { SlackNotificationProcessor } from './SlackNotificationProcessor'; import { catalogServiceMock } from '@backstage/plugin-catalog-node/testUtils'; import { KnownBlock, WebClient } from '@slack/web-api'; import { Entity } from '@backstage/catalog-model'; +import { Knex } from 'knex'; import pThrottle from 'p-throttle'; import { durationToMilliseconds } from '@backstage/types'; @@ -45,6 +46,11 @@ jest.mock('@slack/web-api', () => { ts: '1234567890.123456', channel: 'C12345678', })), + update: jest.fn(() => ({ + ok: true, + ts: '1234567890.123456', + channel: 'C12345678', + })), }, conversations: { list: jest.fn(() => ({ @@ -1615,4 +1621,382 @@ describe('SlackNotificationProcessor', () => { ]); }); }); + + describe('scope-based message updates', () => { + function createMockDb() { + const store = new Map< + string, + { + origin: string; + scope: string; + channel: string; + ts: string; + created_at: Date; + } + >(); + + function storeKey(origin: string, scope: string, channel: string) { + return `${origin}:${scope}:${channel}`; + } + + // Each call to db() creates a fresh query builder that tracks its own + // chained .where()/.insert() arguments, avoiding cross-call interference. + function createQueryBuilder(): any { + let lastWhereArgs: any; + let lastInsertRow: any; + const qb: any = { + where: jest.fn().mockImplementation((args: any) => { + lastWhereArgs = args; + return qb; + }), + first: jest.fn().mockImplementation(() => { + if (lastWhereArgs) { + const key = storeKey( + lastWhereArgs.origin, + lastWhereArgs.scope, + lastWhereArgs.channel, + ); + return Promise.resolve(store.get(key)); + } + return Promise.resolve(undefined); + }), + insert: jest.fn().mockImplementation((row: any) => { + lastInsertRow = row; + store.set(storeKey(row.origin, row.scope, row.channel), row); + return qb; + }), + onConflict: jest.fn().mockReturnThis(), + merge: jest.fn().mockImplementation((row: any) => { + if (lastInsertRow) { + const key = storeKey( + lastInsertRow.origin, + lastInsertRow.scope, + lastInsertRow.channel, + ); + const existing = store.get(key); + if (existing) { + store.set(key, { ...existing, ...row }); + } + } + return Promise.resolve(); + }), + delete: jest.fn().mockResolvedValue(0), + }; + return qb; + } + + const db = jest + .fn() + .mockImplementation(() => createQueryBuilder()) as unknown as Knex; + (db as any).fn = { now: jest.fn().mockReturnValue(new Date()) }; + return { db, store, storeKey }; + } + + function createProcessorWithDb( + slack: WebClient, + db: Knex, + ): SlackNotificationProcessor { + const processor = SlackNotificationProcessor.fromConfig(config, { + auth, + logger, + catalog: catalogServiceMock({ + entities: DEFAULT_ENTITIES_RESPONSE.items, + }), + metrics, + slack, + })[0]; + processor.setDatabase(db); + return processor; + } + + it('should store the message timestamp after initial scoped send', async () => { + const slack = new WebClient(); + const { db, store, storeKey } = createMockDb(); + const processor = createProcessorWithDb(slack, db); + + await processor.postProcess( + { + origin: 'plugin', + id: '1234', + user: 'user:default/mock', + created: new Date(), + payload: { + title: 'notification', + scope: 'deployment-failure/my-service/42', + }, + }, + { + recipients: { type: 'entity', entityRef: 'user:default/mock' }, + payload: { + title: 'notification', + scope: 'deployment-failure/my-service/42', + }, + }, + ); + + expect(slack.chat.postMessage).toHaveBeenCalledTimes(1); + expect(store.size).toBe(1); + const key = storeKey( + 'plugin', + 'deployment-failure/my-service/42', + 'U12345678', + ); + expect(store.get(key)).toEqual( + expect.objectContaining({ + origin: 'plugin', + scope: 'deployment-failure/my-service/42', + channel: 'U12345678', + ts: '1234567890.123456', + }), + ); + }); + + it('should use chat.update when the notification has been updated and a stored ts exists', async () => { + const slack = new WebClient(); + const { db, store, storeKey } = createMockDb(); + + // Pre-populate the store with a previously sent message. + store.set( + storeKey('plugin', 'deployment-failure/my-service/42', 'U12345678'), + { + origin: 'plugin', + scope: 'deployment-failure/my-service/42', + channel: 'U12345678', + ts: '1111111111.111111', + created_at: new Date(), + }, + ); + + const processor = createProcessorWithDb(slack, db); + + await processor.postProcess( + { + origin: 'plugin', + id: '1234', + user: 'user:default/mock', + created: new Date(), + updated: new Date(), + payload: { + title: 'notification', + description: 'Updated with analysis', + scope: 'deployment-failure/my-service/42', + }, + }, + { + recipients: { type: 'entity', entityRef: 'user:default/mock' }, + payload: { + title: 'notification', + description: 'Updated with analysis', + scope: 'deployment-failure/my-service/42', + }, + }, + ); + + expect(slack.chat.postMessage).not.toHaveBeenCalled(); + expect(slack.chat.update).toHaveBeenCalledTimes(1); + + // Verify chat.update receives only the fields it supports (channel, ts, + // text, blocks, attachments) and not the full ChatPostMessageArguments. + const updateArgs = (slack.chat.update as jest.Mock).mock.calls[0][0]; + const updateKeys = Object.keys(updateArgs).sort(); + expect(updateKeys).toEqual(['attachments', 'channel', 'text', 'ts']); + expect(updateArgs.channel).toBe('U12345678'); + expect(updateArgs.ts).toBe('1111111111.111111'); + }); + + it('should fall back to chat.postMessage when notification is updated but no stored ts exists', async () => { + const slack = new WebClient(); + const { db } = createMockDb(); + const processor = createProcessorWithDb(slack, db); + + await processor.postProcess( + { + origin: 'plugin', + id: '1234', + user: 'user:default/mock', + created: new Date(), + updated: new Date(), + payload: { + title: 'notification', + scope: 'deployment-failure/my-service/42', + }, + }, + { + recipients: { type: 'entity', entityRef: 'user:default/mock' }, + payload: { + title: 'notification', + scope: 'deployment-failure/my-service/42', + }, + }, + ); + + // Should fall back to postMessage since there is no stored ts. + expect(slack.chat.update).not.toHaveBeenCalled(); + expect(slack.chat.postMessage).toHaveBeenCalledTimes(1); + }); + + it('should not interact with the database for non-scoped notifications', async () => { + const slack = new WebClient(); + const { db } = createMockDb(); + const processor = createProcessorWithDb(slack, db); + + await processor.postProcess( + { + origin: 'plugin', + id: '1234', + user: 'user:default/mock', + created: new Date(), + payload: { + title: 'notification', + }, + }, + { + recipients: { type: 'entity', entityRef: 'user:default/mock' }, + payload: { title: 'notification' }, + }, + ); + + expect(slack.chat.postMessage).toHaveBeenCalledTimes(1); + // The db function should not have been called since there is no scope. + expect(db).not.toHaveBeenCalled(); + }); + + it('should work without a database (graceful degradation)', async () => { + const slack = new WebClient(); + + // No db set — the processor should still send messages normally. + const processor = SlackNotificationProcessor.fromConfig(config, { + auth, + logger, + catalog: catalogServiceMock({ + entities: DEFAULT_ENTITIES_RESPONSE.items, + }), + metrics, + slack, + })[0]; + + await processor.postProcess( + { + origin: 'plugin', + id: '1234', + user: 'user:default/mock', + created: new Date(), + payload: { + title: 'notification', + scope: 'deployment-failure/my-service/42', + }, + }, + { + recipients: { type: 'entity', entityRef: 'user:default/mock' }, + payload: { + title: 'notification', + scope: 'deployment-failure/my-service/42', + }, + }, + ); + + expect(slack.chat.postMessage).toHaveBeenCalledTimes(1); + expect(slack.chat.update).not.toHaveBeenCalled(); + }); + + it('should handle concurrent postProcess calls with different scopes correctly', async () => { + const slack = new WebClient(); + const { db, store, storeKey } = createMockDb(); + + // Pre-populate stored timestamps for both scopes. + store.set(storeKey('plugin', 'scope-a', 'U12345678'), { + origin: 'plugin', + scope: 'scope-a', + channel: 'U12345678', + ts: '1111111111.111111', + created_at: new Date(), + }); + store.set(storeKey('plugin', 'scope-b', 'U12345678'), { + origin: 'plugin', + scope: 'scope-b', + channel: 'U12345678', + ts: '2222222222.222222', + created_at: new Date(), + }); + + const processor = createProcessorWithDb(slack, db); + + // Fire both postProcess calls concurrently with different scopes. + await Promise.all([ + processor.postProcess( + { + origin: 'plugin', + id: '1', + user: 'user:default/mock', + created: new Date(), + updated: new Date(), + payload: { title: 'A', scope: 'scope-a' }, + }, + { + recipients: { type: 'entity', entityRef: 'user:default/mock' }, + payload: { title: 'A', scope: 'scope-a' }, + }, + ), + processor.postProcess( + { + origin: 'plugin', + id: '2', + user: 'user:default/mock', + created: new Date(), + updated: new Date(), + payload: { title: 'B', scope: 'scope-b' }, + }, + { + recipients: { type: 'entity', entityRef: 'user:default/mock' }, + payload: { title: 'B', scope: 'scope-b' }, + }, + ), + ]); + + // Both should use chat.update, not postMessage. + expect(slack.chat.postMessage).not.toHaveBeenCalled(); + expect(slack.chat.update).toHaveBeenCalledTimes(2); + + // Each update should use the correct ts for its scope. + const updateCalls = (slack.chat.update as jest.Mock).mock.calls; + const timestamps = updateCalls.map((call: any[]) => call[0].ts).sort(); + expect(timestamps).toEqual(['1111111111.111111', '2222222222.222222']); + }); + + it('should not collide across different origins with the same scope', async () => { + const slack = new WebClient(); + const { db, store, storeKey } = createMockDb(); + + // Pre-populate a stored timestamp for origin-a only. + store.set(storeKey('origin-a', 'shared-scope', 'U12345678'), { + origin: 'origin-a', + scope: 'shared-scope', + channel: 'U12345678', + ts: '1111111111.111111', + created_at: new Date(), + }); + + const processor = createProcessorWithDb(slack, db); + + // Send an update from origin-b with the same scope — should NOT find + // the stored ts from origin-a, and should fall back to postMessage. + await processor.postProcess( + { + origin: 'origin-b', + id: '1', + user: 'user:default/mock', + created: new Date(), + updated: new Date(), + payload: { title: 'From B', scope: 'shared-scope' }, + }, + { + recipients: { type: 'entity', entityRef: 'user:default/mock' }, + payload: { title: 'From B', scope: 'shared-scope' }, + }, + ); + + expect(slack.chat.update).not.toHaveBeenCalled(); + expect(slack.chat.postMessage).toHaveBeenCalledTimes(1); + }); + }); }); diff --git a/plugins/notifications-backend-module-slack/src/lib/SlackNotificationProcessor.ts b/plugins/notifications-backend-module-slack/src/lib/SlackNotificationProcessor.ts index 9a95b61210..bcbce0676c 100644 --- a/plugins/notifications-backend-module-slack/src/lib/SlackNotificationProcessor.ts +++ b/plugins/notifications-backend-module-slack/src/lib/SlackNotificationProcessor.ts @@ -34,8 +34,13 @@ import { NotificationSendOptions, } from '@backstage/plugin-notifications-node'; import { durationToMilliseconds } from '@backstage/types'; -import { ChatPostMessageArguments, WebClient } from '@slack/web-api'; +import { + ChatPostMessageArguments, + ChatUpdateArguments, + WebClient, +} from '@slack/web-api'; import DataLoader from 'dataloader'; +import { Knex } from 'knex'; import pThrottle from 'p-throttle'; import { ANNOTATION_SLACK_BOT_NOTIFY } from './constants'; import { BroadcastRoute } from './types'; @@ -43,6 +48,12 @@ import { ExpiryMap, toChatPostMessageArgs } from './util'; import { CatalogService } from '@backstage/plugin-catalog-node'; import { SlackBlockKitRenderer } from '../extensions'; +interface ScopeContext { + origin: string; + scope?: string; + isUpdate?: boolean; +} + export class SlackNotificationProcessor implements NotificationProcessor { private readonly logger: LoggerService; private readonly catalog: CatalogService; @@ -50,9 +61,12 @@ export class SlackNotificationProcessor implements NotificationProcessor { private readonly slack: WebClient; private readonly sendNotifications: ( opts: ChatPostMessageArguments[], + scopeContext?: ScopeContext, ) => Promise; private readonly messagesSent: MetricsServiceCounter; private readonly messagesFailed: MetricsServiceCounter; + private readonly messagesUpdated: MetricsServiceCounter; + private db?: Knex; private readonly broadcastChannels?: string[]; private readonly broadcastRoutes?: BroadcastRoute[]; private readonly entityLoader: DataLoader; @@ -179,25 +193,42 @@ export class SlackNotificationProcessor implements NotificationProcessor { unit: '{message}', }, ); + this.messagesUpdated = metrics.createCounter( + 'notifications.processors.slack.update.count', + { + description: + 'Number of existing Slack messages updated via scope matching', + unit: '{message}', + }, + ); const throttle = pThrottle({ limit: this.concurrencyLimit, interval: this.throttleInterval, }); - const throttled = throttle((opts: ChatPostMessageArguments) => - this.sendNotification(opts), + const throttled = throttle( + (opts: ChatPostMessageArguments, ctx?: ScopeContext) => + this.sendNotification(opts, ctx), ); - this.sendNotifications = async (opts: ChatPostMessageArguments[]) => { + this.sendNotifications = async ( + opts: ChatPostMessageArguments[], + scopeContext?: ScopeContext, + ) => { const results = await Promise.allSettled( - opts.map(message => throttled(message)), + opts.map(message => throttled(message, scopeContext)), ); - let successCount = 0; + let sentCount = 0; + let updateCount = 0; let failureCount = 0; results.forEach((result, index) => { if (result.status === 'fulfilled') { - successCount++; + if (result.value === 'updated') { + updateCount++; + } else { + sentCount++; + } } else { this.logger.error( `Failed to send Slack channel notification to ${opts[index].channel}: ${result.reason.message}`, @@ -206,11 +237,16 @@ export class SlackNotificationProcessor implements NotificationProcessor { } }); - this.messagesSent.add(successCount); + this.messagesSent.add(sentCount); + this.messagesUpdated.add(updateCount); this.messagesFailed.add(failureCount); }; } + setDatabase(db: Knex): void { + this.db = db; + } + getName(): string { return 'SlackNotificationProcessor'; } @@ -335,8 +371,11 @@ export class SlackNotificationProcessor implements NotificationProcessor { this.logger.debug(`Sending notification: ${JSON.stringify(payload)}`); }); - // Send notifications - await this.sendNotifications(outbound); + await this.sendNotifications(outbound, { + origin: notification.origin, + scope: notification.payload.scope, + isUpdate: !!notification.updated, + }); } private async formatPayloadDescriptionForSlack( @@ -432,12 +471,93 @@ export class SlackNotificationProcessor implements NotificationProcessor { } } - async sendNotification(args: ChatPostMessageArguments): Promise { + async sendNotification( + args: ChatPostMessageArguments, + scopeContext?: ScopeContext, + ): Promise<'sent' | 'updated'> { + const channel = args.channel as string; + const scope = scopeContext?.scope; + + // If this is a scoped update, try to update the existing Slack message. + const origin = scopeContext?.origin; + if (scopeContext?.isUpdate && origin && scope && this.db) { + const storedTs = await this.getStoredTimestamp(origin, scope, channel); + if (storedTs) { + const updateArgs = { + channel, + ts: storedTs, + ...('text' in args ? { text: args.text } : {}), + ...('blocks' in args ? { blocks: args.blocks } : {}), + ...('attachments' in args ? { attachments: args.attachments } : {}), + } as ChatUpdateArguments; + const updateResponse = await this.slack.chat.update(updateArgs); + + if (!updateResponse.ok) { + throw new Error( + `Failed to update notification: ${updateResponse.error}`, + ); + } + + return 'updated'; + } + } + + // Send a new message. const response = await this.slack.chat.postMessage(args); if (!response.ok) { throw new Error(`Failed to send notification: ${response.error}`); } + + // Persist the message timestamp for future scope-based updates. + if (origin && scope && response.ts && this.db) { + await this.saveTimestamp(origin, scope, channel, response.ts); + } + + return 'sent'; + } + + private async getStoredTimestamp( + origin: string, + scope: string, + channel: string, + ): Promise { + try { + const row = await this.db!('slack_message_timestamps') + .where({ origin, scope, channel }) + .first(); + return row?.ts; + } catch (error) { + this.logger.warn('Failed to look up stored Slack message timestamp', { + origin, + scope, + channel, + error, + }); + return undefined; + } + } + + private async saveTimestamp( + origin: string, + scope: string, + channel: string, + ts: string, + ): Promise { + try { + const now = this.db!.fn.now(); + await this.db!('slack_message_timestamps') + .insert({ origin, scope, channel, ts, created_at: now }) + .onConflict(['origin', 'scope', 'channel']) + .merge({ ts, created_at: now }); + } catch (error) { + this.logger.warn('Failed to persist Slack message timestamp', { + origin, + scope, + channel, + error, + }); + } } private static parseBroadcastRoute(route: Config): BroadcastRoute { diff --git a/plugins/notifications-backend-module-slack/src/module.ts b/plugins/notifications-backend-module-slack/src/module.ts index 88f0272eb3..450d10f120 100644 --- a/plugins/notifications-backend-module-slack/src/module.ts +++ b/plugins/notifications-backend-module-slack/src/module.ts @@ -16,7 +16,9 @@ import { coreServices, createBackendModule, + resolvePackagePath, } from '@backstage/backend-plugin-api'; +import { Knex } from 'knex'; import { metricsServiceRef } from '@backstage/backend-plugin-api/alpha'; import { notificationsProcessingExtensionPoint } from '@backstage/plugin-notifications-node'; import { SlackNotificationProcessor } from './lib/SlackNotificationProcessor'; @@ -26,6 +28,23 @@ import { SlackBlockKitRenderer, } from './extensions'; +const MIGRATIONS_DIR = resolvePackagePath( + '@backstage/plugin-notifications-backend-module-slack', + 'migrations', +); + +const DB_MIGRATIONS_TABLE = 'notifications_module_slack__knex_migrations'; +const CLEANUP_RETENTION_SECONDS = 24 * 60 * 60; // 24 hours + +function nowMinus(knex: Knex, seconds: number): Knex.Raw { + if (knex.client.config.client.includes('sqlite3')) { + return knex.raw(`datetime('now', ?)`, [`-${seconds} seconds`]); + } else if (knex.client.config.client.includes('mysql')) { + return knex.raw(`now() - interval ${seconds} second`); + } + return knex.raw(`now() - interval '${seconds} seconds'`); +} + /** * The Slack notification processor for use with the notifications plugin. * This allows sending of notifications via Slack DMs or to channels. @@ -54,17 +73,69 @@ export const notificationsModuleSlack = createBackendModule({ catalog: catalogServiceRef, notifications: notificationsProcessingExtensionPoint, metrics: metricsServiceRef, + database: coreServices.database, + scheduler: coreServices.scheduler, }, - async init({ auth, config, logger, catalog, notifications, metrics }) { - notifications.addProcessor( - SlackNotificationProcessor.fromConfig(config, { - auth, - logger, - catalog, - metrics, - blockKitRenderer, - }), - ); + async init({ + auth, + config, + logger, + catalog, + notifications, + metrics, + database, + scheduler, + }) { + const processors = SlackNotificationProcessor.fromConfig(config, { + auth, + logger, + catalog, + metrics, + blockKitRenderer, + }); + + if (processors.length === 0) { + return; + } + + const db = await database.getClient(); + + if (!database.migrations?.skip) { + await db.migrate.latest({ + directory: MIGRATIONS_DIR, + tableName: DB_MIGRATIONS_TABLE, + }); + } + + // Attach the DB to each processor now that migrations have run. + for (const processor of processors) { + processor.setDatabase(db); + } + + notifications.addProcessor(processors); + + // Clean up old message timestamp records daily. These records are only + // needed for the short window between initial send and scope-based + // update (typically minutes), so a 24-hour retention is sufficient. + await scheduler.scheduleTask({ + id: 'slack-message-timestamps-cleanup', + frequency: { hours: 24 }, + timeout: { minutes: 5 }, + initialDelay: { hours: 2 }, + scope: 'global', + fn: async () => { + const deleted = await db('slack_message_timestamps') + .where( + 'created_at', + '<=', + nowMinus(db, CLEANUP_RETENTION_SECONDS), + ) + .delete(); + logger.info('Cleaned up old Slack message timestamps', { + deleted, + }); + }, + }); }, }); }, diff --git a/yarn.lock b/yarn.lock index ade399ee0b..9af93c96f6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6245,6 +6245,7 @@ __metadata: "@slack/types": "npm:^2.14.0" "@slack/web-api": "npm:^7.5.0" dataloader: "npm:^2.0.0" + knex: "npm:^3.0.0" msw: "npm:^2.0.0" p-throttle: "npm:^4.1.1" languageName: unknown