implement filtering in pubsub events module
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-events-backend-module-google-pubsub': minor
|
||||
---
|
||||
|
||||
Added an optional `filter` property to PubSub consumers/publishers
|
||||
@@ -67,6 +67,37 @@ export interface Config {
|
||||
*/
|
||||
targetTopic: string;
|
||||
|
||||
/**
|
||||
* Message filter predicate expression.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* The value should be a JSON object that represents a filter predicate expression.
|
||||
* The object being passed to the filter is on the following form:
|
||||
*
|
||||
* ```js
|
||||
* {
|
||||
* message: {
|
||||
* // The raw JSON parsed message data
|
||||
* data: { ... },
|
||||
* // The message attributes as key-value pairs
|
||||
* attributes: { key: 'value', ... },
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* @example
|
||||
*
|
||||
* ```yaml
|
||||
* filter:
|
||||
* $any:
|
||||
* - 'message.attributes.x-github-event': 'push'
|
||||
* - 'message.attributes.x-github-event': 'repository'
|
||||
* 'message.data.action': { $in: ['created', 'deleted'] }
|
||||
* ```
|
||||
*/
|
||||
filter?: object;
|
||||
|
||||
/**
|
||||
* Pub/Sub message attributes are by default copied to the event
|
||||
* metadata field. This setting allows you to override or amend
|
||||
@@ -128,6 +159,39 @@ export interface Config {
|
||||
*/
|
||||
targetTopicName: string;
|
||||
|
||||
/**
|
||||
* Event filter predicate expression.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* The value should be a JSON object that represents a filter predicate expression.
|
||||
* The object being passed to the filter is on the following form:
|
||||
*
|
||||
* ```js
|
||||
* {
|
||||
* event: {
|
||||
* // The event topic
|
||||
* topic: '...',
|
||||
* // The raw event payload
|
||||
* eventPayload: { ... },
|
||||
* // The event metadata as key-value pairs
|
||||
* metadata: { key: 'value', ... },
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* @example
|
||||
*
|
||||
* ```yaml
|
||||
* filter:
|
||||
* $any:
|
||||
* - 'event.topic': 'github.push'
|
||||
* - 'event.topic': 'github.repository'
|
||||
* 'event.eventPayload.action': { $in: ['created', 'deleted'] }
|
||||
* ```
|
||||
*/
|
||||
filter?: object;
|
||||
|
||||
/**
|
||||
* Event metadata fields are by default copied to the Pub/Sub
|
||||
* message attribute. This setting allows you to override or amend
|
||||
|
||||
@@ -37,6 +37,7 @@
|
||||
"@backstage/backend-plugin-api": "workspace:^",
|
||||
"@backstage/config": "workspace:^",
|
||||
"@backstage/errors": "workspace:^",
|
||||
"@backstage/filter-predicates": "workspace:^",
|
||||
"@backstage/plugin-events-node": "workspace:^",
|
||||
"@backstage/types": "workspace:^",
|
||||
"@google-cloud/pubsub": "^4.10.0",
|
||||
|
||||
+5
-1
@@ -52,8 +52,12 @@ describe('EventConsumingGooglePubSubPublisher', () => {
|
||||
id: 'my-id',
|
||||
sourceTopics: ['my-topic'],
|
||||
targetTopicPattern: 'projects/my-project/topics/my-topic',
|
||||
filter: () => true,
|
||||
mapToTopic: () => ({ project: 'my-project', topic: 'my-topic' }),
|
||||
mapToAttributes: m => ({ ...m.metadata, more: 'yes' }),
|
||||
mapToAttributes: context => ({
|
||||
...context.event.metadata,
|
||||
more: 'yes',
|
||||
}),
|
||||
},
|
||||
],
|
||||
pubSubFactory,
|
||||
|
||||
+17
-3
@@ -23,7 +23,8 @@ import { EventsService } from '@backstage/plugin-events-node';
|
||||
import { PubSub } from '@google-cloud/pubsub';
|
||||
import { Counter, metrics } from '@opentelemetry/api';
|
||||
import { readSubscriptionTasksFromConfig } from './config';
|
||||
import { SubscriptionTask } from './types';
|
||||
import { EventContext, SubscriptionTask } from './types';
|
||||
import { JsonValue } from '@backstage/types';
|
||||
|
||||
/**
|
||||
* Reads messages off of the events system and forwards them into Google Pub/Sub
|
||||
@@ -103,7 +104,20 @@ export class EventConsumingGooglePubSubPublisher {
|
||||
onEvent: async event => {
|
||||
let status: 'success' | 'failed' | 'ignored' = 'failed';
|
||||
try {
|
||||
const topic = task.mapToTopic(event);
|
||||
const context: EventContext = {
|
||||
event: {
|
||||
topic: event.topic,
|
||||
eventPayload: event.eventPayload as JsonValue,
|
||||
metadata: event.metadata,
|
||||
},
|
||||
};
|
||||
|
||||
if (!task.filter(context)) {
|
||||
status = 'ignored';
|
||||
return;
|
||||
}
|
||||
|
||||
const topic = task.mapToTopic(context);
|
||||
if (!topic) {
|
||||
status = 'ignored';
|
||||
return;
|
||||
@@ -117,7 +131,7 @@ export class EventConsumingGooglePubSubPublisher {
|
||||
|
||||
await pubsub.topic(topic.topic).publishMessage({
|
||||
json: event.eventPayload,
|
||||
attributes: task.mapToAttributes(event),
|
||||
attributes: task.mapToAttributes(context),
|
||||
});
|
||||
|
||||
status = 'success';
|
||||
|
||||
+116
-18
@@ -16,6 +16,7 @@
|
||||
|
||||
import { mockServices } from '@backstage/backend-test-utils';
|
||||
import { readSubscriptionTasksFromConfig } from './config';
|
||||
import { FilterPredicate } from '@backstage/filter-predicates';
|
||||
|
||||
describe('readSubscriptionTasksFromConfig', () => {
|
||||
it('reads with basic targetTopic', () => {
|
||||
@@ -49,6 +50,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
id: 'subKey1',
|
||||
sourceTopics: ['my-topic'],
|
||||
targetTopicPattern: 'projects/pid/topics/tid',
|
||||
filter: expect.any(Function),
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToAttributes: expect.any(Function),
|
||||
},
|
||||
@@ -56,6 +58,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
id: 'subKey2',
|
||||
sourceTopics: ['my-topic-1', 'my-topic-2'],
|
||||
targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}',
|
||||
filter: expect.any(Function),
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToAttributes: expect.any(Function),
|
||||
},
|
||||
@@ -63,16 +66,20 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
|
||||
expect(
|
||||
result[0].mapToTopic({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { attr: 'yes' },
|
||||
event: {
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { attr: 'yes' },
|
||||
},
|
||||
}),
|
||||
).toEqual({ project: 'pid', topic: 'tid' });
|
||||
expect(
|
||||
result[0].mapToAttributes({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { attr: 'yes' },
|
||||
event: {
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { attr: 'yes' },
|
||||
},
|
||||
}),
|
||||
).toEqual({ attr: 'yes' });
|
||||
});
|
||||
@@ -116,6 +123,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
id: 'sub1',
|
||||
sourceTopics: ['my-topic'],
|
||||
targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}',
|
||||
filter: expect.any(Function),
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToAttributes: expect.any(Function),
|
||||
},
|
||||
@@ -124,6 +132,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
sourceTopics: ['my-topic'],
|
||||
targetTopicPattern:
|
||||
'projects/pid/topics/tid.{{ event.metadata.missing }}',
|
||||
filter: expect.any(Function),
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToAttributes: expect.any(Function),
|
||||
},
|
||||
@@ -131,16 +140,28 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
|
||||
expect(
|
||||
result[0].mapToTopic({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' },
|
||||
event: {
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: {
|
||||
exists: 'exists',
|
||||
attr1: 'original1',
|
||||
attr2: 'original2',
|
||||
},
|
||||
},
|
||||
}),
|
||||
).toEqual({ project: 'pid', topic: 'tid.a' }); // Message attribute existed, successfully routed
|
||||
expect(
|
||||
result[0].mapToAttributes({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' },
|
||||
event: {
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: {
|
||||
exists: 'exists',
|
||||
attr1: 'original1',
|
||||
attr2: 'original2',
|
||||
},
|
||||
},
|
||||
}),
|
||||
).toEqual({
|
||||
exists: 'exists',
|
||||
@@ -150,16 +171,28 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
|
||||
expect(
|
||||
result[1].mapToTopic({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' },
|
||||
event: {
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: {
|
||||
exists: 'exists',
|
||||
attr1: 'original1',
|
||||
attr2: 'original2',
|
||||
},
|
||||
},
|
||||
}),
|
||||
).toBeUndefined(); // Message attribute did not exist, could not be routed
|
||||
expect(
|
||||
result[1].mapToAttributes({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' },
|
||||
event: {
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: {
|
||||
exists: 'exists',
|
||||
attr1: 'original1',
|
||||
attr2: 'original2',
|
||||
},
|
||||
},
|
||||
}),
|
||||
).toEqual({
|
||||
exists: 'exists',
|
||||
@@ -169,6 +202,71 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('reads with filter', () => {
|
||||
const exampleFilter: FilterPredicate = {
|
||||
'event.topic': 'push',
|
||||
};
|
||||
|
||||
const data = {
|
||||
events: {
|
||||
modules: {
|
||||
googlePubSub: {
|
||||
eventConsumingGooglePubSubPublisher: {
|
||||
subscriptions: {
|
||||
subKey: {
|
||||
sourceTopic: 'my-topic',
|
||||
targetTopicName: 'projects/pid/topics/tid.{{ event.topic }}',
|
||||
filter: exampleFilter,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const result = readSubscriptionTasksFromConfig(
|
||||
mockServices.rootConfig({ data }),
|
||||
);
|
||||
expect(result).toEqual([
|
||||
{
|
||||
id: 'subKey',
|
||||
sourceTopics: ['my-topic'],
|
||||
targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}',
|
||||
filter: expect.any(Function),
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToAttributes: expect.any(Function),
|
||||
},
|
||||
]);
|
||||
|
||||
expect(
|
||||
result[0].filter({
|
||||
event: {
|
||||
topic: 'push',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: {
|
||||
exists: 'exists',
|
||||
attr1: 'original1',
|
||||
attr2: 'original2',
|
||||
},
|
||||
},
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
result[0].filter({
|
||||
event: {
|
||||
topic: 'pull_request',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: {
|
||||
exists: 'exists',
|
||||
attr1: 'original1',
|
||||
attr2: 'original2',
|
||||
},
|
||||
},
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('rejects malformed subscription name', () => {
|
||||
const data = {
|
||||
events: {
|
||||
|
||||
+31
-12
@@ -17,9 +17,12 @@
|
||||
import { RootConfigService } from '@backstage/backend-plugin-api';
|
||||
import { Config } from '@backstage/config';
|
||||
import { InputError } from '@backstage/errors';
|
||||
import { EventParams } from '@backstage/plugin-events-node';
|
||||
import {
|
||||
readOptionalFilterPredicateFromConfig,
|
||||
filterPredicateToFilterFunction,
|
||||
} from '@backstage/filter-predicates';
|
||||
import { createPatternResolver } from '../util/createPatternResolver';
|
||||
import { SubscriptionTask } from './types';
|
||||
import { EventContext, SubscriptionTask } from './types';
|
||||
|
||||
export function readSubscriptionTasksFromConfig(
|
||||
rootConfig: RootConfigService,
|
||||
@@ -40,6 +43,7 @@ export function readSubscriptionTasksFromConfig(
|
||||
|
||||
const config = subscriptionsConfig.getConfig(subscriptionId);
|
||||
const sourceTopics = readSourceTopics(config);
|
||||
const filter = readFilter(config);
|
||||
const mapToTopic = readTopicMapper(config);
|
||||
const mapToAttributes = readAttributeMapper(config);
|
||||
|
||||
@@ -47,6 +51,7 @@ export function readSubscriptionTasksFromConfig(
|
||||
id: subscriptionId,
|
||||
sourceTopics: sourceTopics,
|
||||
targetTopicPattern: config.getString('targetTopicName'),
|
||||
filter,
|
||||
mapToTopic,
|
||||
mapToAttributes,
|
||||
};
|
||||
@@ -60,12 +65,26 @@ function readSourceTopics(config: Config): string[] {
|
||||
return [config.getString('sourceTopic')];
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the `filter` configuration field.
|
||||
*/
|
||||
function readFilter(config: Config): (context: EventContext) => boolean {
|
||||
const predicate = readOptionalFilterPredicateFromConfig(config, {
|
||||
key: 'filter',
|
||||
});
|
||||
if (!predicate) {
|
||||
return () => true;
|
||||
}
|
||||
|
||||
return filterPredicateToFilterFunction(predicate);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the `targetTopicName` configuration field.
|
||||
*/
|
||||
function readTopicMapper(
|
||||
config: Config,
|
||||
): (event: EventParams) => { project: string; topic: string } | undefined {
|
||||
): (context: EventContext) => { project: string; topic: string } | undefined {
|
||||
const regex = /^projects\/([^/]+)\/topics\/(.+)$/;
|
||||
|
||||
const targetTopicPattern = config.getString('targetTopicName');
|
||||
@@ -78,9 +97,9 @@ function readTopicMapper(
|
||||
|
||||
const patternResolver = createPatternResolver(targetTopicPattern);
|
||||
|
||||
return event => {
|
||||
return context => {
|
||||
try {
|
||||
parts = patternResolver({ event }).match(regex);
|
||||
parts = patternResolver(context).match(regex);
|
||||
if (!parts) {
|
||||
return undefined;
|
||||
}
|
||||
@@ -100,10 +119,10 @@ function readTopicMapper(
|
||||
*/
|
||||
function readAttributeMapper(
|
||||
config: Config,
|
||||
): (event: EventParams) => Record<string, string> {
|
||||
): (context: EventContext) => Record<string, string> {
|
||||
const setters = new Array<
|
||||
(options: {
|
||||
event: EventParams;
|
||||
context: EventContext;
|
||||
attributes: Record<string, string>;
|
||||
}) => void
|
||||
>();
|
||||
@@ -113,9 +132,9 @@ function readAttributeMapper(
|
||||
for (const key of eventMetadata?.keys() ?? []) {
|
||||
const valuePattern = eventMetadata.getString(key);
|
||||
const patternResolver = createPatternResolver(valuePattern);
|
||||
setters.push(({ event, attributes }) => {
|
||||
setters.push(({ context, attributes }) => {
|
||||
try {
|
||||
const value = patternResolver({ event });
|
||||
const value = patternResolver(context);
|
||||
if (value) {
|
||||
attributes[key] = value;
|
||||
}
|
||||
@@ -126,9 +145,9 @@ function readAttributeMapper(
|
||||
}
|
||||
}
|
||||
|
||||
return event => {
|
||||
return context => {
|
||||
const result: Record<string, string> = {};
|
||||
for (const [key, value] of Object.entries(event.metadata ?? {})) {
|
||||
for (const [key, value] of Object.entries(context.event.metadata ?? {})) {
|
||||
if (value) {
|
||||
if (typeof value === 'string') {
|
||||
result[key] = value;
|
||||
@@ -139,7 +158,7 @@ function readAttributeMapper(
|
||||
}
|
||||
}
|
||||
for (const setter of setters) {
|
||||
setter({ event, attributes: result });
|
||||
setter({ context, attributes: result });
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
+18
-3
@@ -14,7 +14,21 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { EventParams } from '@backstage/plugin-events-node';
|
||||
import { JsonObject, JsonValue } from '@backstage/types';
|
||||
|
||||
/**
|
||||
* The contextual information about an event, as given to
|
||||
* configured filters and mappers.
|
||||
*/
|
||||
export interface EventContext extends JsonObject {
|
||||
// Actually the raw EventParams; slightly modified payload type to avoid type
|
||||
// inference issues.
|
||||
event: {
|
||||
topic: string;
|
||||
eventPayload: JsonValue;
|
||||
metadata?: Record<string, string | string[] | undefined>;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* A configured subscription task.
|
||||
@@ -23,8 +37,9 @@ export interface SubscriptionTask {
|
||||
id: string;
|
||||
sourceTopics: string[];
|
||||
targetTopicPattern: string;
|
||||
filter: (context: EventContext) => boolean;
|
||||
mapToTopic: (
|
||||
event: EventParams,
|
||||
context: EventContext,
|
||||
) => { project: string; topic: string } | undefined;
|
||||
mapToAttributes: (event: EventParams) => Record<string, string>;
|
||||
mapToAttributes: (context: EventContext) => Record<string, string>;
|
||||
}
|
||||
|
||||
+2
-1
@@ -67,8 +67,9 @@ describe('GooglePubSubConsumingEventPublisher', () => {
|
||||
id: 'my-id',
|
||||
project: 'my-project',
|
||||
subscription: 'my-subscription',
|
||||
filter: () => true,
|
||||
mapToTopic: () => 'my-topic',
|
||||
mapToMetadata: m => ({ ...m.attributes, more: 'yes' }),
|
||||
mapToMetadata: m => ({ ...m.message.attributes, more: 'yes' }),
|
||||
},
|
||||
],
|
||||
pubSubFactory,
|
||||
|
||||
+20
-4
@@ -23,7 +23,7 @@ import { EventParams, EventsService } from '@backstage/plugin-events-node';
|
||||
import { Message, PubSub, Subscription } from '@google-cloud/pubsub';
|
||||
import { Counter, metrics } from '@opentelemetry/api';
|
||||
import { readSubscriptionTasksFromConfig } from './config';
|
||||
import { SubscriptionTask } from './types';
|
||||
import { MessageContext, SubscriptionTask } from './types';
|
||||
|
||||
/**
|
||||
* Reads messages off of Google Pub/Sub subscriptions and forwards them into the
|
||||
@@ -197,14 +197,30 @@ export class GooglePubSubConsumingEventPublisher {
|
||||
message: Message,
|
||||
task: SubscriptionTask,
|
||||
): EventParams | undefined {
|
||||
const topic = task.mapToTopic(message);
|
||||
const eventPayload = JSON.parse(message.data.toString());
|
||||
const attributes = message.attributes;
|
||||
|
||||
const context: MessageContext = {
|
||||
message: {
|
||||
data: eventPayload,
|
||||
attributes,
|
||||
},
|
||||
};
|
||||
|
||||
if (!task.filter(context)) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const topic = task.mapToTopic(context);
|
||||
if (!topic) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const metadata = task.mapToMetadata(context);
|
||||
return {
|
||||
topic,
|
||||
eventPayload: JSON.parse(message.data.toString()),
|
||||
metadata: task.mapToMetadata(message),
|
||||
eventPayload,
|
||||
metadata,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
+67
-15
@@ -16,20 +16,20 @@
|
||||
|
||||
import { mockServices } from '@backstage/backend-test-utils';
|
||||
import { JsonObject } from '@backstage/types';
|
||||
import { Message } from '@google-cloud/pubsub';
|
||||
import { FilterPredicate } from '@backstage/filter-predicates';
|
||||
import { readSubscriptionTasksFromConfig } from './config';
|
||||
import { MessageContext } from './types';
|
||||
|
||||
function makeMessage(
|
||||
function makeContext(
|
||||
data: JsonObject,
|
||||
attributes: Record<string, string>,
|
||||
): Message {
|
||||
const message: Partial<Message> = {
|
||||
attributes,
|
||||
data: Buffer.from(JSON.stringify(data), 'utf-8'),
|
||||
ack: jest.fn(),
|
||||
nack: jest.fn(),
|
||||
): MessageContext {
|
||||
return {
|
||||
message: {
|
||||
data,
|
||||
attributes,
|
||||
},
|
||||
};
|
||||
return message as Message;
|
||||
}
|
||||
|
||||
describe('readSubscriptionTasksFromConfig', () => {
|
||||
@@ -60,16 +60,17 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
id: 'subKey',
|
||||
project: 'pid',
|
||||
subscription: 'sid',
|
||||
filter: expect.any(Function),
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToMetadata: expect.any(Function),
|
||||
},
|
||||
]);
|
||||
|
||||
expect(
|
||||
result[0].mapToTopic(makeMessage({ foo: 'bar' }, { attr: 'yes' })),
|
||||
result[0].mapToTopic(makeContext({ foo: 'bar' }, { attr: 'yes' })),
|
||||
).toBe('my-topic');
|
||||
expect(
|
||||
result[0].mapToMetadata(makeMessage({ foo: 'bar' }, { attr: 'yes' })),
|
||||
result[0].mapToMetadata(makeContext({ foo: 'bar' }, { attr: 'yes' })),
|
||||
).toEqual({ attr: 'yes' });
|
||||
});
|
||||
|
||||
@@ -111,6 +112,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
id: 'sub1',
|
||||
project: 'pid',
|
||||
subscription: 'sid',
|
||||
filter: expect.any(Function),
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToMetadata: expect.any(Function),
|
||||
},
|
||||
@@ -118,6 +120,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
id: 'sub2',
|
||||
project: 'pid',
|
||||
subscription: 'sid',
|
||||
filter: expect.any(Function),
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToMetadata: expect.any(Function),
|
||||
},
|
||||
@@ -125,7 +128,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
|
||||
expect(
|
||||
result[0].mapToTopic(
|
||||
makeMessage(
|
||||
makeContext(
|
||||
{ foo: 'bar' },
|
||||
{ exists: 'exists', meta1: 'original1', meta2: 'original2' },
|
||||
),
|
||||
@@ -133,7 +136,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
).toBe('t.exists'); // Message attribute existed, successfully routed
|
||||
expect(
|
||||
result[0].mapToMetadata(
|
||||
makeMessage(
|
||||
makeContext(
|
||||
{ foo: 'bar' },
|
||||
{ exists: 'exists', meta1: 'original1', meta2: 'original2' },
|
||||
),
|
||||
@@ -146,7 +149,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
|
||||
expect(
|
||||
result[1].mapToTopic(
|
||||
makeMessage(
|
||||
makeContext(
|
||||
{ foo: 'bar' },
|
||||
{ exists: 'exists', meta1: 'original1', meta2: 'original2' },
|
||||
),
|
||||
@@ -154,7 +157,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
).toBeUndefined(); // Message attribute did not exist, could not be routed
|
||||
expect(
|
||||
result[1].mapToMetadata(
|
||||
makeMessage(
|
||||
makeContext(
|
||||
{ foo: 'bar' },
|
||||
{ exists: 'exists', meta1: 'original1', meta2: 'original2' },
|
||||
),
|
||||
@@ -167,6 +170,55 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
});
|
||||
});
|
||||
|
||||
const exampleFilter: FilterPredicate = {
|
||||
'message.attributes.x-github-event': 'push',
|
||||
};
|
||||
|
||||
it('reads with filter', () => {
|
||||
const data = {
|
||||
events: {
|
||||
modules: {
|
||||
googlePubSub: {
|
||||
googlePubSubConsumingEventPublisher: {
|
||||
subscriptions: {
|
||||
subKey: {
|
||||
subscriptionName: 'projects/pid/subscriptions/sid',
|
||||
targetTopic: 'github.{{ message.attributes.x-github-event }}',
|
||||
filter: exampleFilter,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const result = readSubscriptionTasksFromConfig(
|
||||
mockServices.rootConfig({ data }),
|
||||
);
|
||||
expect(result).toEqual([
|
||||
{
|
||||
id: 'subKey',
|
||||
project: 'pid',
|
||||
subscription: 'sid',
|
||||
filter: expect.any(Function),
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToMetadata: expect.any(Function),
|
||||
},
|
||||
]);
|
||||
|
||||
expect(
|
||||
result[0].filter(
|
||||
makeContext({ foo: 'bar' }, { 'x-github-event': 'push' }),
|
||||
),
|
||||
).toBe(true);
|
||||
expect(
|
||||
result[0].filter(
|
||||
makeContext({ foo: 'bar' }, { 'x-github-event': 'pull_request' }),
|
||||
),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('rejects malformed subscription name', () => {
|
||||
const data = {
|
||||
events: {
|
||||
|
||||
+30
-11
@@ -17,9 +17,12 @@
|
||||
import { RootConfigService } from '@backstage/backend-plugin-api';
|
||||
import { Config } from '@backstage/config';
|
||||
import { InputError } from '@backstage/errors';
|
||||
import { Message } from '@google-cloud/pubsub';
|
||||
import {
|
||||
readOptionalFilterPredicateFromConfig,
|
||||
filterPredicateToFilterFunction,
|
||||
} from '@backstage/filter-predicates';
|
||||
import { createPatternResolver } from '../util/createPatternResolver';
|
||||
import { SubscriptionTask } from './types';
|
||||
import { MessageContext, SubscriptionTask } from './types';
|
||||
|
||||
export function readSubscriptionTasksFromConfig(
|
||||
rootConfig: RootConfigService,
|
||||
@@ -40,6 +43,7 @@ export function readSubscriptionTasksFromConfig(
|
||||
|
||||
const config = subscriptionsConfig.getConfig(subscriptionId);
|
||||
const { project, subscription } = readSubscriptionName(config);
|
||||
const filter = readFilter(config);
|
||||
const mapToTopic = readTopicMapper(config);
|
||||
const mapToMetadata = readMetadataMapper(config);
|
||||
|
||||
@@ -47,12 +51,24 @@ export function readSubscriptionTasksFromConfig(
|
||||
id: subscriptionId,
|
||||
project,
|
||||
subscription,
|
||||
filter,
|
||||
mapToTopic,
|
||||
mapToMetadata,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
function readFilter(config: Config): (message: MessageContext) => boolean {
|
||||
const predicate = readOptionalFilterPredicateFromConfig(config, {
|
||||
key: 'filter',
|
||||
});
|
||||
if (!predicate) {
|
||||
return () => true;
|
||||
}
|
||||
|
||||
return filterPredicateToFilterFunction(predicate);
|
||||
}
|
||||
|
||||
function readSubscriptionName(config: Config): {
|
||||
project: string;
|
||||
subscription: string;
|
||||
@@ -77,12 +93,12 @@ function readSubscriptionName(config: Config): {
|
||||
*/
|
||||
function readTopicMapper(
|
||||
config: Config,
|
||||
): (message: Message) => string | undefined {
|
||||
): (message: MessageContext) => string | undefined {
|
||||
const targetTopicPattern = config.getString('targetTopic');
|
||||
const patternResolver = createPatternResolver(targetTopicPattern);
|
||||
return message => {
|
||||
try {
|
||||
return patternResolver({ message });
|
||||
return patternResolver(message);
|
||||
} catch {
|
||||
// could not map to a topic
|
||||
return undefined;
|
||||
@@ -95,9 +111,12 @@ function readTopicMapper(
|
||||
*/
|
||||
function readMetadataMapper(
|
||||
config: Config,
|
||||
): (message: Message) => Record<string, string> {
|
||||
): (message: MessageContext) => Record<string, string> {
|
||||
const setters = new Array<
|
||||
(options: { message: Message; metadata: Record<string, string> }) => void
|
||||
(options: {
|
||||
context: MessageContext;
|
||||
metadata: Record<string, string>;
|
||||
}) => void
|
||||
>();
|
||||
|
||||
const eventMetadata = config.getOptionalConfig('eventMetadata');
|
||||
@@ -105,9 +124,9 @@ function readMetadataMapper(
|
||||
for (const key of eventMetadata?.keys() ?? []) {
|
||||
const valuePattern = eventMetadata.getString(key);
|
||||
const patternResolver = createPatternResolver(valuePattern);
|
||||
setters.push(({ message, metadata }) => {
|
||||
setters.push(({ context, metadata }) => {
|
||||
try {
|
||||
const value = patternResolver({ message });
|
||||
const value = patternResolver(context);
|
||||
if (value) {
|
||||
metadata[key] = value;
|
||||
}
|
||||
@@ -118,12 +137,12 @@ function readMetadataMapper(
|
||||
}
|
||||
}
|
||||
|
||||
return message => {
|
||||
return context => {
|
||||
const result: Record<string, string> = {
|
||||
...message.attributes,
|
||||
...context.message.attributes,
|
||||
};
|
||||
for (const setter of setters) {
|
||||
setter({ message, metadata: result });
|
||||
setter({ context, metadata: result });
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
+20
-3
@@ -14,7 +14,23 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Message } from '@google-cloud/pubsub';
|
||||
import { JsonObject, JsonValue } from '@backstage/types';
|
||||
|
||||
/**
|
||||
* The contextual information about a Google Pub/Sub message, as given to
|
||||
* configured filters and mappers.
|
||||
*/
|
||||
export interface MessageContext extends JsonObject {
|
||||
/**
|
||||
* Parsed subset of the original message.
|
||||
*/
|
||||
message: {
|
||||
data: JsonValue;
|
||||
attributes: {
|
||||
[key in string]: string;
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* A configured subscription task.
|
||||
@@ -23,6 +39,7 @@ export interface SubscriptionTask {
|
||||
id: string;
|
||||
project: string;
|
||||
subscription: string;
|
||||
mapToTopic: (message: Message) => string | undefined;
|
||||
mapToMetadata: (message: Message) => Record<string, string>;
|
||||
filter: (context: MessageContext) => boolean;
|
||||
mapToTopic: (context: MessageContext) => string | undefined;
|
||||
mapToMetadata: (context: MessageContext) => Record<string, string>;
|
||||
}
|
||||
|
||||
@@ -40,11 +40,6 @@ describe('createPatternResolver', () => {
|
||||
a: { b: ['first', 'second'] },
|
||||
}),
|
||||
).toEqual('-second-');
|
||||
expect(
|
||||
createPatternResolver('-{{ a.b.0 }}-')({
|
||||
a: { b: ['first', 'second'] },
|
||||
}),
|
||||
).toEqual('-first-');
|
||||
});
|
||||
|
||||
it('throws on bad / missing context values', () => {
|
||||
|
||||
@@ -15,16 +15,20 @@
|
||||
*/
|
||||
|
||||
import { InputError } from '@backstage/errors';
|
||||
import { JsonObject, JsonValue } from '@backstage/types';
|
||||
import { getJsonValueAtPath } from '@backstage/filter-predicates';
|
||||
|
||||
/**
|
||||
* Takes a pattern string that may contain `{{ path.to.value }}` placeholders,
|
||||
* and returns a function that accepts a context object and returns strings that
|
||||
* and returns a function that accepts an input object and returns strings that
|
||||
* have had its placeholders filled in by following the dot separated path of
|
||||
* properties accordingly on the context.
|
||||
* properties accordingly on the input.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export function createPatternResolver<TContext extends object = object>(
|
||||
export function createPatternResolver(
|
||||
pattern: string,
|
||||
): (context: TContext) => string {
|
||||
): (input: JsonObject) => string {
|
||||
// This split results in an array where even elements are static strings
|
||||
// between placeholders, and odd elements are the contents inside
|
||||
// placeholders.
|
||||
@@ -35,7 +39,7 @@ export function createPatternResolver<TContext extends object = object>(
|
||||
// ['', 'foo', '-', 'bar', '', 'baz', '.']
|
||||
const patternParts = pattern.split(/{{\s*([\w\[\]'"_.-]*)\s*}}/g);
|
||||
|
||||
const resolvers = new Array<(context: TContext) => string>();
|
||||
const resolvers = new Array<(input: JsonObject) => string>();
|
||||
|
||||
for (let i = 0; i < patternParts.length; i += 2) {
|
||||
const staticPart = patternParts[i];
|
||||
@@ -46,9 +50,9 @@ export function createPatternResolver<TContext extends object = object>(
|
||||
}
|
||||
|
||||
if (placeholderPart) {
|
||||
const getter = createGetter<TContext>(placeholderPart);
|
||||
resolvers.push(context => {
|
||||
const value = getter(context);
|
||||
const getter = createGetter(placeholderPart);
|
||||
resolvers.push(input => {
|
||||
const value = getter(input);
|
||||
if (typeof value === 'string' || Number.isFinite(value)) {
|
||||
return String(value);
|
||||
} else if (!value) {
|
||||
@@ -62,50 +66,61 @@ export function createPatternResolver<TContext extends object = object>(
|
||||
}
|
||||
}
|
||||
|
||||
return context => resolvers.map(resolver => resolver(context)).join('');
|
||||
return input => resolvers.map(resolver => resolver(input)).join('');
|
||||
}
|
||||
|
||||
function createGetter<TContext extends object = object>(
|
||||
/**
|
||||
* Takes a path string that indexes into an object, and returns a function that
|
||||
* fetches values out of such objects.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export function createGetter(
|
||||
path: string,
|
||||
): (context: TContext) => unknown | undefined {
|
||||
// The resulti of the split contains quads:
|
||||
): (input: JsonObject) => JsonValue | undefined {
|
||||
// The result of the split contains pairs (with maybe no last element):
|
||||
//
|
||||
// - any "regular" part
|
||||
// - pure digits that were within brackets, if applicable
|
||||
// - contents of a single quoted string that was within brackets, if applicable
|
||||
// - contents of a double quoted string that was within brackets, if applicable
|
||||
// - any "regular" dot separated parts, if applicable
|
||||
// - any "exact" match parts within square brackets, if applicable
|
||||
//
|
||||
// For example, the path:
|
||||
// foo.bar[0].baz["qux.e"]a
|
||||
//
|
||||
// will result in:
|
||||
// [
|
||||
// 'foo', undefined, undefined, undefined,
|
||||
// 'bar', '0', undefined, undefined,
|
||||
// 'baz', undefined, 'qux.e', undefined,
|
||||
// 'a'
|
||||
// 'foo.bar', '0',
|
||||
// 'baz', 'qux.e',
|
||||
// 'a',
|
||||
// ]
|
||||
// and then the empty elements are stripped away
|
||||
const parts = path
|
||||
.split(/\.|\[(?:(\d+)|'([^']+)'|"([^"]+)")\]\.?/g)
|
||||
.split(/\[(?:(\d+)|'([^']+)'|"([^"]+)")\]\.?/g)
|
||||
.filter(Boolean);
|
||||
|
||||
return (context: TContext): unknown | undefined => {
|
||||
let current = context;
|
||||
for (const part of parts) {
|
||||
if (typeof current !== 'object' || !current) {
|
||||
return undefined;
|
||||
return input => {
|
||||
let current: JsonValue | undefined = input;
|
||||
for (let i = 0; i < parts.length; i += 2) {
|
||||
const regularPart = parts[i];
|
||||
const exactPart = parts[i + 1];
|
||||
|
||||
if (regularPart) {
|
||||
current = getJsonValueAtPath(current, regularPart);
|
||||
}
|
||||
|
||||
if (Array.isArray(current)) {
|
||||
if (!part.match(/^\d+$/)) {
|
||||
if (exactPart) {
|
||||
if (typeof current !== 'object' || !current) {
|
||||
return undefined;
|
||||
} else if (Array.isArray(current)) {
|
||||
if (exactPart.match(/^\d+$/)) {
|
||||
current = current[Number(exactPart)];
|
||||
} else {
|
||||
return undefined;
|
||||
}
|
||||
} else {
|
||||
if (!Object.hasOwn(current, exactPart)) {
|
||||
return undefined;
|
||||
}
|
||||
current = current[exactPart];
|
||||
}
|
||||
current = (current as any[])[Number(part)];
|
||||
} else {
|
||||
if (!Object.hasOwn(current, part)) {
|
||||
return undefined;
|
||||
}
|
||||
current = (current as any)[part];
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5806,6 +5806,7 @@ __metadata:
|
||||
"@backstage/cli": "workspace:^"
|
||||
"@backstage/config": "workspace:^"
|
||||
"@backstage/errors": "workspace:^"
|
||||
"@backstage/filter-predicates": "workspace:^"
|
||||
"@backstage/plugin-events-backend": "workspace:^"
|
||||
"@backstage/plugin-events-node": "workspace:^"
|
||||
"@backstage/types": "workspace:^"
|
||||
|
||||
Reference in New Issue
Block a user