diff --git a/.changeset/tricky-beans-watch.md b/.changeset/tricky-beans-watch.md new file mode 100644 index 0000000000..770efffb48 --- /dev/null +++ b/.changeset/tricky-beans-watch.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-events-backend-module-google-pubsub': minor +--- + +Added an optional `filter` property to PubSub consumers/publishers diff --git a/plugins/events-backend-module-google-pubsub/config.d.ts b/plugins/events-backend-module-google-pubsub/config.d.ts index 1097b71e00..b6d3df633d 100644 --- a/plugins/events-backend-module-google-pubsub/config.d.ts +++ b/plugins/events-backend-module-google-pubsub/config.d.ts @@ -67,6 +67,37 @@ export interface Config { */ targetTopic: string; + /** + * Message filter predicate expression. + * + * @remarks + * + * The value should be a JSON object that represents a filter predicate expression. + * The object being passed to the filter is on the following form: + * + * ```js + * { + * message: { + * // The raw JSON parsed message data + * data: { ... }, + * // The message attributes as key-value pairs + * attributes: { key: 'value', ... }, + * } + * } + * ``` + * + * @example + * + * ```yaml + * filter: + * $any: + * - 'message.attributes.x-github-event': 'push' + * - 'message.attributes.x-github-event': 'repository' + * 'message.data.action': { $in: ['created', 'deleted'] } + * ``` + */ + filter?: object; + /** * Pub/Sub message attributes are by default copied to the event * metadata field. This setting allows you to override or amend @@ -128,6 +159,39 @@ export interface Config { */ targetTopicName: string; + /** + * Event filter predicate expression. + * + * @remarks + * + * The value should be a JSON object that represents a filter predicate expression. + * The object being passed to the filter is on the following form: + * + * ```js + * { + * event: { + * // The event topic + * topic: '...', + * // The raw event payload + * eventPayload: { ... }, + * // The event metadata as key-value pairs + * metadata: { key: 'value', ... }, + * } + * } + * ``` + * + * @example + * + * ```yaml + * filter: + * $any: + * - 'event.topic': 'github.push' + * - 'event.topic': 'github.repository' + * 'event.eventPayload.action': { $in: ['created', 'deleted'] } + * ``` + */ + filter?: object; + /** * Event metadata fields are by default copied to the Pub/Sub * message attribute. This setting allows you to override or amend diff --git a/plugins/events-backend-module-google-pubsub/package.json b/plugins/events-backend-module-google-pubsub/package.json index 7e3d049f37..27bf5413ef 100644 --- a/plugins/events-backend-module-google-pubsub/package.json +++ b/plugins/events-backend-module-google-pubsub/package.json @@ -37,6 +37,7 @@ "@backstage/backend-plugin-api": "workspace:^", "@backstage/config": "workspace:^", "@backstage/errors": "workspace:^", + "@backstage/filter-predicates": "workspace:^", "@backstage/plugin-events-node": "workspace:^", "@backstage/types": "workspace:^", "@google-cloud/pubsub": "^4.10.0", diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.test.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.test.ts index e5af68add7..96ef7d4551 100644 --- a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.test.ts +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.test.ts @@ -52,8 +52,12 @@ describe('EventConsumingGooglePubSubPublisher', () => { id: 'my-id', sourceTopics: ['my-topic'], targetTopicPattern: 'projects/my-project/topics/my-topic', + filter: () => true, mapToTopic: () => ({ project: 'my-project', topic: 'my-topic' }), - mapToAttributes: m => ({ ...m.metadata, more: 'yes' }), + mapToAttributes: context => ({ + ...context.event.metadata, + more: 'yes', + }), }, ], pubSubFactory, diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.ts index 4a88bc2e79..6847d944d5 100644 --- a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.ts +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.ts @@ -23,7 +23,8 @@ import { EventsService } from '@backstage/plugin-events-node'; import { PubSub } from '@google-cloud/pubsub'; import { Counter, metrics } from '@opentelemetry/api'; import { readSubscriptionTasksFromConfig } from './config'; -import { SubscriptionTask } from './types'; +import { EventContext, SubscriptionTask } from './types'; +import { JsonValue } from '@backstage/types'; /** * Reads messages off of the events system and forwards them into Google Pub/Sub @@ -103,7 +104,20 @@ export class EventConsumingGooglePubSubPublisher { onEvent: async event => { let status: 'success' | 'failed' | 'ignored' = 'failed'; try { - const topic = task.mapToTopic(event); + const context: EventContext = { + event: { + topic: event.topic, + eventPayload: event.eventPayload as JsonValue, + metadata: event.metadata, + }, + }; + + if (!task.filter(context)) { + status = 'ignored'; + return; + } + + const topic = task.mapToTopic(context); if (!topic) { status = 'ignored'; return; @@ -117,7 +131,7 @@ export class EventConsumingGooglePubSubPublisher { await pubsub.topic(topic.topic).publishMessage({ json: event.eventPayload, - attributes: task.mapToAttributes(event), + attributes: task.mapToAttributes(context), }); status = 'success'; diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.test.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.test.ts index c3a5c1a5bb..a71f40bc78 100644 --- a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.test.ts +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.test.ts @@ -16,6 +16,7 @@ import { mockServices } from '@backstage/backend-test-utils'; import { readSubscriptionTasksFromConfig } from './config'; +import { FilterPredicate } from '@backstage/filter-predicates'; describe('readSubscriptionTasksFromConfig', () => { it('reads with basic targetTopic', () => { @@ -49,6 +50,7 @@ describe('readSubscriptionTasksFromConfig', () => { id: 'subKey1', sourceTopics: ['my-topic'], targetTopicPattern: 'projects/pid/topics/tid', + filter: expect.any(Function), mapToTopic: expect.any(Function), mapToAttributes: expect.any(Function), }, @@ -56,6 +58,7 @@ describe('readSubscriptionTasksFromConfig', () => { id: 'subKey2', sourceTopics: ['my-topic-1', 'my-topic-2'], targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}', + filter: expect.any(Function), mapToTopic: expect.any(Function), mapToAttributes: expect.any(Function), }, @@ -63,16 +66,20 @@ describe('readSubscriptionTasksFromConfig', () => { expect( result[0].mapToTopic({ - topic: 'a', - eventPayload: { foo: 'bar' }, - metadata: { attr: 'yes' }, + event: { + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { attr: 'yes' }, + }, }), ).toEqual({ project: 'pid', topic: 'tid' }); expect( result[0].mapToAttributes({ - topic: 'a', - eventPayload: { foo: 'bar' }, - metadata: { attr: 'yes' }, + event: { + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { attr: 'yes' }, + }, }), ).toEqual({ attr: 'yes' }); }); @@ -116,6 +123,7 @@ describe('readSubscriptionTasksFromConfig', () => { id: 'sub1', sourceTopics: ['my-topic'], targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}', + filter: expect.any(Function), mapToTopic: expect.any(Function), mapToAttributes: expect.any(Function), }, @@ -124,6 +132,7 @@ describe('readSubscriptionTasksFromConfig', () => { sourceTopics: ['my-topic'], targetTopicPattern: 'projects/pid/topics/tid.{{ event.metadata.missing }}', + filter: expect.any(Function), mapToTopic: expect.any(Function), mapToAttributes: expect.any(Function), }, @@ -131,16 +140,28 @@ describe('readSubscriptionTasksFromConfig', () => { expect( result[0].mapToTopic({ - topic: 'a', - eventPayload: { foo: 'bar' }, - metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' }, + event: { + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { + exists: 'exists', + attr1: 'original1', + attr2: 'original2', + }, + }, }), ).toEqual({ project: 'pid', topic: 'tid.a' }); // Message attribute existed, successfully routed expect( result[0].mapToAttributes({ - topic: 'a', - eventPayload: { foo: 'bar' }, - metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' }, + event: { + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { + exists: 'exists', + attr1: 'original1', + attr2: 'original2', + }, + }, }), ).toEqual({ exists: 'exists', @@ -150,16 +171,28 @@ describe('readSubscriptionTasksFromConfig', () => { expect( result[1].mapToTopic({ - topic: 'a', - eventPayload: { foo: 'bar' }, - metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' }, + event: { + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { + exists: 'exists', + attr1: 'original1', + attr2: 'original2', + }, + }, }), ).toBeUndefined(); // Message attribute did not exist, could not be routed expect( result[1].mapToAttributes({ - topic: 'a', - eventPayload: { foo: 'bar' }, - metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' }, + event: { + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { + exists: 'exists', + attr1: 'original1', + attr2: 'original2', + }, + }, }), ).toEqual({ exists: 'exists', @@ -169,6 +202,71 @@ describe('readSubscriptionTasksFromConfig', () => { }); }); + it('reads with filter', () => { + const exampleFilter: FilterPredicate = { + 'event.topic': 'push', + }; + + const data = { + events: { + modules: { + googlePubSub: { + eventConsumingGooglePubSubPublisher: { + subscriptions: { + subKey: { + sourceTopic: 'my-topic', + targetTopicName: 'projects/pid/topics/tid.{{ event.topic }}', + filter: exampleFilter, + }, + }, + }, + }, + }, + }, + }; + + const result = readSubscriptionTasksFromConfig( + mockServices.rootConfig({ data }), + ); + expect(result).toEqual([ + { + id: 'subKey', + sourceTopics: ['my-topic'], + targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}', + filter: expect.any(Function), + mapToTopic: expect.any(Function), + mapToAttributes: expect.any(Function), + }, + ]); + + expect( + result[0].filter({ + event: { + topic: 'push', + eventPayload: { foo: 'bar' }, + metadata: { + exists: 'exists', + attr1: 'original1', + attr2: 'original2', + }, + }, + }), + ).toBe(true); + expect( + result[0].filter({ + event: { + topic: 'pull_request', + eventPayload: { foo: 'bar' }, + metadata: { + exists: 'exists', + attr1: 'original1', + attr2: 'original2', + }, + }, + }), + ).toBe(false); + }); + it('rejects malformed subscription name', () => { const data = { events: { diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.ts index 7ccd78cfcb..fd39130636 100644 --- a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.ts +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.ts @@ -17,9 +17,12 @@ import { RootConfigService } from '@backstage/backend-plugin-api'; import { Config } from '@backstage/config'; import { InputError } from '@backstage/errors'; -import { EventParams } from '@backstage/plugin-events-node'; +import { + readOptionalFilterPredicateFromConfig, + filterPredicateToFilterFunction, +} from '@backstage/filter-predicates'; import { createPatternResolver } from '../util/createPatternResolver'; -import { SubscriptionTask } from './types'; +import { EventContext, SubscriptionTask } from './types'; export function readSubscriptionTasksFromConfig( rootConfig: RootConfigService, @@ -40,6 +43,7 @@ export function readSubscriptionTasksFromConfig( const config = subscriptionsConfig.getConfig(subscriptionId); const sourceTopics = readSourceTopics(config); + const filter = readFilter(config); const mapToTopic = readTopicMapper(config); const mapToAttributes = readAttributeMapper(config); @@ -47,6 +51,7 @@ export function readSubscriptionTasksFromConfig( id: subscriptionId, sourceTopics: sourceTopics, targetTopicPattern: config.getString('targetTopicName'), + filter, mapToTopic, mapToAttributes, }; @@ -60,12 +65,26 @@ function readSourceTopics(config: Config): string[] { return [config.getString('sourceTopic')]; } +/** + * Handles the `filter` configuration field. + */ +function readFilter(config: Config): (context: EventContext) => boolean { + const predicate = readOptionalFilterPredicateFromConfig(config, { + key: 'filter', + }); + if (!predicate) { + return () => true; + } + + return filterPredicateToFilterFunction(predicate); +} + /** * Handles the `targetTopicName` configuration field. */ function readTopicMapper( config: Config, -): (event: EventParams) => { project: string; topic: string } | undefined { +): (context: EventContext) => { project: string; topic: string } | undefined { const regex = /^projects\/([^/]+)\/topics\/(.+)$/; const targetTopicPattern = config.getString('targetTopicName'); @@ -78,9 +97,9 @@ function readTopicMapper( const patternResolver = createPatternResolver(targetTopicPattern); - return event => { + return context => { try { - parts = patternResolver({ event }).match(regex); + parts = patternResolver(context).match(regex); if (!parts) { return undefined; } @@ -100,10 +119,10 @@ function readTopicMapper( */ function readAttributeMapper( config: Config, -): (event: EventParams) => Record { +): (context: EventContext) => Record { const setters = new Array< (options: { - event: EventParams; + context: EventContext; attributes: Record; }) => void >(); @@ -113,9 +132,9 @@ function readAttributeMapper( for (const key of eventMetadata?.keys() ?? []) { const valuePattern = eventMetadata.getString(key); const patternResolver = createPatternResolver(valuePattern); - setters.push(({ event, attributes }) => { + setters.push(({ context, attributes }) => { try { - const value = patternResolver({ event }); + const value = patternResolver(context); if (value) { attributes[key] = value; } @@ -126,9 +145,9 @@ function readAttributeMapper( } } - return event => { + return context => { const result: Record = {}; - for (const [key, value] of Object.entries(event.metadata ?? {})) { + for (const [key, value] of Object.entries(context.event.metadata ?? {})) { if (value) { if (typeof value === 'string') { result[key] = value; @@ -139,7 +158,7 @@ function readAttributeMapper( } } for (const setter of setters) { - setter({ event, attributes: result }); + setter({ context, attributes: result }); } return result; }; diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/types.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/types.ts index 36b3aeaa78..a106866080 100644 --- a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/types.ts +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/types.ts @@ -14,7 +14,21 @@ * limitations under the License. */ -import { EventParams } from '@backstage/plugin-events-node'; +import { JsonObject, JsonValue } from '@backstage/types'; + +/** + * The contextual information about an event, as given to + * configured filters and mappers. + */ +export interface EventContext extends JsonObject { + // Actually the raw EventParams; slightly modified payload type to avoid type + // inference issues. + event: { + topic: string; + eventPayload: JsonValue; + metadata?: Record; + }; +} /** * A configured subscription task. @@ -23,8 +37,9 @@ export interface SubscriptionTask { id: string; sourceTopics: string[]; targetTopicPattern: string; + filter: (context: EventContext) => boolean; mapToTopic: ( - event: EventParams, + context: EventContext, ) => { project: string; topic: string } | undefined; - mapToAttributes: (event: EventParams) => Record; + mapToAttributes: (context: EventContext) => Record; } diff --git a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/GooglePubSubConsumingEventPublisher.test.ts b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/GooglePubSubConsumingEventPublisher.test.ts index 60c6816729..8b2f98d8af 100644 --- a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/GooglePubSubConsumingEventPublisher.test.ts +++ b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/GooglePubSubConsumingEventPublisher.test.ts @@ -67,8 +67,9 @@ describe('GooglePubSubConsumingEventPublisher', () => { id: 'my-id', project: 'my-project', subscription: 'my-subscription', + filter: () => true, mapToTopic: () => 'my-topic', - mapToMetadata: m => ({ ...m.attributes, more: 'yes' }), + mapToMetadata: m => ({ ...m.message.attributes, more: 'yes' }), }, ], pubSubFactory, diff --git a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/GooglePubSubConsumingEventPublisher.ts b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/GooglePubSubConsumingEventPublisher.ts index 4844c216fb..ae7b7e57f5 100644 --- a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/GooglePubSubConsumingEventPublisher.ts +++ b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/GooglePubSubConsumingEventPublisher.ts @@ -23,7 +23,7 @@ import { EventParams, EventsService } from '@backstage/plugin-events-node'; import { Message, PubSub, Subscription } from '@google-cloud/pubsub'; import { Counter, metrics } from '@opentelemetry/api'; import { readSubscriptionTasksFromConfig } from './config'; -import { SubscriptionTask } from './types'; +import { MessageContext, SubscriptionTask } from './types'; /** * Reads messages off of Google Pub/Sub subscriptions and forwards them into the @@ -197,14 +197,30 @@ export class GooglePubSubConsumingEventPublisher { message: Message, task: SubscriptionTask, ): EventParams | undefined { - const topic = task.mapToTopic(message); + const eventPayload = JSON.parse(message.data.toString()); + const attributes = message.attributes; + + const context: MessageContext = { + message: { + data: eventPayload, + attributes, + }, + }; + + if (!task.filter(context)) { + return undefined; + } + + const topic = task.mapToTopic(context); if (!topic) { return undefined; } + + const metadata = task.mapToMetadata(context); return { topic, - eventPayload: JSON.parse(message.data.toString()), - metadata: task.mapToMetadata(message), + eventPayload, + metadata, }; } } diff --git a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.test.ts b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.test.ts index b4034123f0..8303a50d5b 100644 --- a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.test.ts +++ b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.test.ts @@ -16,20 +16,20 @@ import { mockServices } from '@backstage/backend-test-utils'; import { JsonObject } from '@backstage/types'; -import { Message } from '@google-cloud/pubsub'; +import { FilterPredicate } from '@backstage/filter-predicates'; import { readSubscriptionTasksFromConfig } from './config'; +import { MessageContext } from './types'; -function makeMessage( +function makeContext( data: JsonObject, attributes: Record, -): Message { - const message: Partial = { - attributes, - data: Buffer.from(JSON.stringify(data), 'utf-8'), - ack: jest.fn(), - nack: jest.fn(), +): MessageContext { + return { + message: { + data, + attributes, + }, }; - return message as Message; } describe('readSubscriptionTasksFromConfig', () => { @@ -60,16 +60,17 @@ describe('readSubscriptionTasksFromConfig', () => { id: 'subKey', project: 'pid', subscription: 'sid', + filter: expect.any(Function), mapToTopic: expect.any(Function), mapToMetadata: expect.any(Function), }, ]); expect( - result[0].mapToTopic(makeMessage({ foo: 'bar' }, { attr: 'yes' })), + result[0].mapToTopic(makeContext({ foo: 'bar' }, { attr: 'yes' })), ).toBe('my-topic'); expect( - result[0].mapToMetadata(makeMessage({ foo: 'bar' }, { attr: 'yes' })), + result[0].mapToMetadata(makeContext({ foo: 'bar' }, { attr: 'yes' })), ).toEqual({ attr: 'yes' }); }); @@ -111,6 +112,7 @@ describe('readSubscriptionTasksFromConfig', () => { id: 'sub1', project: 'pid', subscription: 'sid', + filter: expect.any(Function), mapToTopic: expect.any(Function), mapToMetadata: expect.any(Function), }, @@ -118,6 +120,7 @@ describe('readSubscriptionTasksFromConfig', () => { id: 'sub2', project: 'pid', subscription: 'sid', + filter: expect.any(Function), mapToTopic: expect.any(Function), mapToMetadata: expect.any(Function), }, @@ -125,7 +128,7 @@ describe('readSubscriptionTasksFromConfig', () => { expect( result[0].mapToTopic( - makeMessage( + makeContext( { foo: 'bar' }, { exists: 'exists', meta1: 'original1', meta2: 'original2' }, ), @@ -133,7 +136,7 @@ describe('readSubscriptionTasksFromConfig', () => { ).toBe('t.exists'); // Message attribute existed, successfully routed expect( result[0].mapToMetadata( - makeMessage( + makeContext( { foo: 'bar' }, { exists: 'exists', meta1: 'original1', meta2: 'original2' }, ), @@ -146,7 +149,7 @@ describe('readSubscriptionTasksFromConfig', () => { expect( result[1].mapToTopic( - makeMessage( + makeContext( { foo: 'bar' }, { exists: 'exists', meta1: 'original1', meta2: 'original2' }, ), @@ -154,7 +157,7 @@ describe('readSubscriptionTasksFromConfig', () => { ).toBeUndefined(); // Message attribute did not exist, could not be routed expect( result[1].mapToMetadata( - makeMessage( + makeContext( { foo: 'bar' }, { exists: 'exists', meta1: 'original1', meta2: 'original2' }, ), @@ -167,6 +170,55 @@ describe('readSubscriptionTasksFromConfig', () => { }); }); + const exampleFilter: FilterPredicate = { + 'message.attributes.x-github-event': 'push', + }; + + it('reads with filter', () => { + const data = { + events: { + modules: { + googlePubSub: { + googlePubSubConsumingEventPublisher: { + subscriptions: { + subKey: { + subscriptionName: 'projects/pid/subscriptions/sid', + targetTopic: 'github.{{ message.attributes.x-github-event }}', + filter: exampleFilter, + }, + }, + }, + }, + }, + }, + }; + + const result = readSubscriptionTasksFromConfig( + mockServices.rootConfig({ data }), + ); + expect(result).toEqual([ + { + id: 'subKey', + project: 'pid', + subscription: 'sid', + filter: expect.any(Function), + mapToTopic: expect.any(Function), + mapToMetadata: expect.any(Function), + }, + ]); + + expect( + result[0].filter( + makeContext({ foo: 'bar' }, { 'x-github-event': 'push' }), + ), + ).toBe(true); + expect( + result[0].filter( + makeContext({ foo: 'bar' }, { 'x-github-event': 'pull_request' }), + ), + ).toBe(false); + }); + it('rejects malformed subscription name', () => { const data = { events: { diff --git a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.ts b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.ts index d8dfb2ada2..d93a442df7 100644 --- a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.ts +++ b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.ts @@ -17,9 +17,12 @@ import { RootConfigService } from '@backstage/backend-plugin-api'; import { Config } from '@backstage/config'; import { InputError } from '@backstage/errors'; -import { Message } from '@google-cloud/pubsub'; +import { + readOptionalFilterPredicateFromConfig, + filterPredicateToFilterFunction, +} from '@backstage/filter-predicates'; import { createPatternResolver } from '../util/createPatternResolver'; -import { SubscriptionTask } from './types'; +import { MessageContext, SubscriptionTask } from './types'; export function readSubscriptionTasksFromConfig( rootConfig: RootConfigService, @@ -40,6 +43,7 @@ export function readSubscriptionTasksFromConfig( const config = subscriptionsConfig.getConfig(subscriptionId); const { project, subscription } = readSubscriptionName(config); + const filter = readFilter(config); const mapToTopic = readTopicMapper(config); const mapToMetadata = readMetadataMapper(config); @@ -47,12 +51,24 @@ export function readSubscriptionTasksFromConfig( id: subscriptionId, project, subscription, + filter, mapToTopic, mapToMetadata, }; }); } +function readFilter(config: Config): (message: MessageContext) => boolean { + const predicate = readOptionalFilterPredicateFromConfig(config, { + key: 'filter', + }); + if (!predicate) { + return () => true; + } + + return filterPredicateToFilterFunction(predicate); +} + function readSubscriptionName(config: Config): { project: string; subscription: string; @@ -77,12 +93,12 @@ function readSubscriptionName(config: Config): { */ function readTopicMapper( config: Config, -): (message: Message) => string | undefined { +): (message: MessageContext) => string | undefined { const targetTopicPattern = config.getString('targetTopic'); const patternResolver = createPatternResolver(targetTopicPattern); return message => { try { - return patternResolver({ message }); + return patternResolver(message); } catch { // could not map to a topic return undefined; @@ -95,9 +111,12 @@ function readTopicMapper( */ function readMetadataMapper( config: Config, -): (message: Message) => Record { +): (message: MessageContext) => Record { const setters = new Array< - (options: { message: Message; metadata: Record }) => void + (options: { + context: MessageContext; + metadata: Record; + }) => void >(); const eventMetadata = config.getOptionalConfig('eventMetadata'); @@ -105,9 +124,9 @@ function readMetadataMapper( for (const key of eventMetadata?.keys() ?? []) { const valuePattern = eventMetadata.getString(key); const patternResolver = createPatternResolver(valuePattern); - setters.push(({ message, metadata }) => { + setters.push(({ context, metadata }) => { try { - const value = patternResolver({ message }); + const value = patternResolver(context); if (value) { metadata[key] = value; } @@ -118,12 +137,12 @@ function readMetadataMapper( } } - return message => { + return context => { const result: Record = { - ...message.attributes, + ...context.message.attributes, }; for (const setter of setters) { - setter({ message, metadata: result }); + setter({ context, metadata: result }); } return result; }; diff --git a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/types.ts b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/types.ts index 08c1e11999..113212e038 100644 --- a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/types.ts +++ b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/types.ts @@ -14,7 +14,23 @@ * limitations under the License. */ -import { Message } from '@google-cloud/pubsub'; +import { JsonObject, JsonValue } from '@backstage/types'; + +/** + * The contextual information about a Google Pub/Sub message, as given to + * configured filters and mappers. + */ +export interface MessageContext extends JsonObject { + /** + * Parsed subset of the original message. + */ + message: { + data: JsonValue; + attributes: { + [key in string]: string; + }; + }; +} /** * A configured subscription task. @@ -23,6 +39,7 @@ export interface SubscriptionTask { id: string; project: string; subscription: string; - mapToTopic: (message: Message) => string | undefined; - mapToMetadata: (message: Message) => Record; + filter: (context: MessageContext) => boolean; + mapToTopic: (context: MessageContext) => string | undefined; + mapToMetadata: (context: MessageContext) => Record; } diff --git a/plugins/events-backend-module-google-pubsub/src/util/createPatternResolver.test.ts b/plugins/events-backend-module-google-pubsub/src/util/createPatternResolver.test.ts index 555ec40476..c2115dee00 100644 --- a/plugins/events-backend-module-google-pubsub/src/util/createPatternResolver.test.ts +++ b/plugins/events-backend-module-google-pubsub/src/util/createPatternResolver.test.ts @@ -40,11 +40,6 @@ describe('createPatternResolver', () => { a: { b: ['first', 'second'] }, }), ).toEqual('-second-'); - expect( - createPatternResolver('-{{ a.b.0 }}-')({ - a: { b: ['first', 'second'] }, - }), - ).toEqual('-first-'); }); it('throws on bad / missing context values', () => { diff --git a/plugins/events-backend-module-google-pubsub/src/util/createPatternResolver.ts b/plugins/events-backend-module-google-pubsub/src/util/createPatternResolver.ts index 01e0b74e2e..5fb414e832 100644 --- a/plugins/events-backend-module-google-pubsub/src/util/createPatternResolver.ts +++ b/plugins/events-backend-module-google-pubsub/src/util/createPatternResolver.ts @@ -15,16 +15,20 @@ */ import { InputError } from '@backstage/errors'; +import { JsonObject, JsonValue } from '@backstage/types'; +import { getJsonValueAtPath } from '@backstage/filter-predicates'; /** * Takes a pattern string that may contain `{{ path.to.value }}` placeholders, - * and returns a function that accepts a context object and returns strings that + * and returns a function that accepts an input object and returns strings that * have had its placeholders filled in by following the dot separated path of - * properties accordingly on the context. + * properties accordingly on the input. + * + * @internal */ -export function createPatternResolver( +export function createPatternResolver( pattern: string, -): (context: TContext) => string { +): (input: JsonObject) => string { // This split results in an array where even elements are static strings // between placeholders, and odd elements are the contents inside // placeholders. @@ -35,7 +39,7 @@ export function createPatternResolver( // ['', 'foo', '-', 'bar', '', 'baz', '.'] const patternParts = pattern.split(/{{\s*([\w\[\]'"_.-]*)\s*}}/g); - const resolvers = new Array<(context: TContext) => string>(); + const resolvers = new Array<(input: JsonObject) => string>(); for (let i = 0; i < patternParts.length; i += 2) { const staticPart = patternParts[i]; @@ -46,9 +50,9 @@ export function createPatternResolver( } if (placeholderPart) { - const getter = createGetter(placeholderPart); - resolvers.push(context => { - const value = getter(context); + const getter = createGetter(placeholderPart); + resolvers.push(input => { + const value = getter(input); if (typeof value === 'string' || Number.isFinite(value)) { return String(value); } else if (!value) { @@ -62,50 +66,61 @@ export function createPatternResolver( } } - return context => resolvers.map(resolver => resolver(context)).join(''); + return input => resolvers.map(resolver => resolver(input)).join(''); } -function createGetter( +/** + * Takes a path string that indexes into an object, and returns a function that + * fetches values out of such objects. + * + * @internal + */ +export function createGetter( path: string, -): (context: TContext) => unknown | undefined { - // The resulti of the split contains quads: +): (input: JsonObject) => JsonValue | undefined { + // The result of the split contains pairs (with maybe no last element): // - // - any "regular" part - // - pure digits that were within brackets, if applicable - // - contents of a single quoted string that was within brackets, if applicable - // - contents of a double quoted string that was within brackets, if applicable + // - any "regular" dot separated parts, if applicable + // - any "exact" match parts within square brackets, if applicable // // For example, the path: // foo.bar[0].baz["qux.e"]a + // // will result in: // [ - // 'foo', undefined, undefined, undefined, - // 'bar', '0', undefined, undefined, - // 'baz', undefined, 'qux.e', undefined, - // 'a' + // 'foo.bar', '0', + // 'baz', 'qux.e', + // 'a', // ] - // and then the empty elements are stripped away const parts = path - .split(/\.|\[(?:(\d+)|'([^']+)'|"([^"]+)")\]\.?/g) + .split(/\[(?:(\d+)|'([^']+)'|"([^"]+)")\]\.?/g) .filter(Boolean); - return (context: TContext): unknown | undefined => { - let current = context; - for (const part of parts) { - if (typeof current !== 'object' || !current) { - return undefined; + return input => { + let current: JsonValue | undefined = input; + for (let i = 0; i < parts.length; i += 2) { + const regularPart = parts[i]; + const exactPart = parts[i + 1]; + + if (regularPart) { + current = getJsonValueAtPath(current, regularPart); } - if (Array.isArray(current)) { - if (!part.match(/^\d+$/)) { + if (exactPart) { + if (typeof current !== 'object' || !current) { return undefined; + } else if (Array.isArray(current)) { + if (exactPart.match(/^\d+$/)) { + current = current[Number(exactPart)]; + } else { + return undefined; + } + } else { + if (!Object.hasOwn(current, exactPart)) { + return undefined; + } + current = current[exactPart]; } - current = (current as any[])[Number(part)]; - } else { - if (!Object.hasOwn(current, part)) { - return undefined; - } - current = (current as any)[part]; } } diff --git a/yarn.lock b/yarn.lock index 7a4b186507..36e05d2bac 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5806,6 +5806,7 @@ __metadata: "@backstage/cli": "workspace:^" "@backstage/config": "workspace:^" "@backstage/errors": "workspace:^" + "@backstage/filter-predicates": "workspace:^" "@backstage/plugin-events-backend": "workspace:^" "@backstage/plugin-events-node": "workspace:^" "@backstage/types": "workspace:^"