diff --git a/.changeset/thin-flies-travel.md b/.changeset/thin-flies-travel.md new file mode 100644 index 0000000000..4dc29933e8 --- /dev/null +++ b/.changeset/thin-flies-travel.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-events-backend-module-kafka': minor +--- + +Add support for Kafka offset configuration (`fromBeginning`) and `autoCommit` diff --git a/plugins/events-backend-module-kafka/README.md b/plugins/events-backend-module-kafka/README.md index f77b5d2f21..ebc4b5f292 100644 --- a/plugins/events-backend-module-kafka/README.md +++ b/plugins/events-backend-module-kafka/README.md @@ -33,6 +33,10 @@ events: topics: # (Required) The Kafka topics to subscribe to. - topic1 groupId: your-group-id # (Required) The GroupId to be used by the topic consumers. + # Optional offset management settings (these can be omitted to use defaults): + # fromBeginning: false # Start from earliest offset when no committed offset exists. Default: not set (latest) + # autoCommit: true # Enable auto-commit. Default: true (for backward compatibility) + # pauseOnError: false # Pause consumer on error. Default: false (for backward compatibility) ``` ### KafkaPublishingEventConsumer Configuration @@ -57,6 +61,77 @@ events: For a complete list of all available fields that can be configured, refer to the [config.d.ts file](./config.d.ts). +### Offset Management + +The plugin supports configurable offset management to control message delivery semantics: + +#### Auto Commit (Default - Backward Compatible) + +By default (`autoCommit: true` or not specified), Kafka automatically commits offsets at regular intervals. This is the original behavior and ensures backward compatibility. + +#### Manual Commit (Opt-in for Reliability) + +When you explicitly set `autoCommit: false`, the plugin will: + +1. Start consuming from the last committed offset for the consumer group +2. Process each message by publishing it to the Backstage events system +3. Only commit the offset after successful processing +4. If processing fails, pause the consumer and do not commit the offset + +**Example configuration for manual commit:** + +```yaml +kafka: + topics: + - topic1 + groupId: my-group + autoCommit: false # Enable manual commit +``` + +#### Error Handling + +The `pauseOnError` option controls how the consumer behaves when message processing fails: + +**Skip Failed Messages (Default - Backward Compatible)** + +By default (`pauseOnError: false` or not specified), the consumer will skip failed messages and continue processing: + +- The consumer logs the error but continues processing subsequent messages +- If `autoCommit: false`, the offset is still committed to skip the failed message +- If `autoCommit: true`, Kafka's auto-commit handles the offset +- Recommended when occasional message failures are acceptable and should not block processing + +**Pause on Error (Opt-in)** + +When you explicitly set `pauseOnError: true`, the consumer will pause when an error occurs during message processing: + +- The consumer pauses and stops processing new messages +- The failed message offset is not committed +- The error is re-thrown and logged +- Recommended when you want to investigate and fix issues before continuing + +**Example configuration to pause on error:** + +```yaml +kafka: + topics: + - topic1 + groupId: my-group + autoCommit: false + pauseOnError: true # Pause consumer when a message fails +``` + +**Note:** When using the default behavior (`pauseOnError: false`) with `autoCommit: false`, failed messages will have their offsets committed, meaning they will be skipped and not reprocessed. Use this configuration carefully based on your application's requirements. + +#### Starting Position + +The `fromBeginning` option controls where the consumer starts when no committed offset exists: + +- `fromBeginning: true` - Start from the earliest available message +- `fromBeginning: false` (default) - Start from the latest message (only new messages) + +Once the consumer group has committed an offset, it will always resume from that position, regardless of the `fromBeginning` setting. + ### Optional SSL Configuration If your Kafka cluster requires SSL, you can configure it for both `kafkaConsumingEventPublisher` and `kafkaPublishingEventConsumer` instances: diff --git a/plugins/events-backend-module-kafka/config.d.ts b/plugins/events-backend-module-kafka/config.d.ts index 47ebae0653..1413f98a49 100644 --- a/plugins/events-backend-module-kafka/config.d.ts +++ b/plugins/events-backend-module-kafka/config.d.ts @@ -200,6 +200,29 @@ export interface Config { * Default: 5000 */ maxWaitTime?: HumanDuration | string; + + /** + * (Optional) If true, the consumer group will start from the earliest offset when no committed offset is found. + * If false or not specified, it will start from the latest offset. + * Default: undefined (start from latest) + */ + fromBeginning?: boolean; + + /** + * (Optional) Enable auto-commit of offsets. + * When true (default), offsets are automatically committed at regular intervals (at-most-once delivery). + * When false, offsets are only committed after successful message processing (at-least-once delivery). + * Default: true (auto-commit enabled for backward compatibility) + */ + autoCommit?: boolean; + + /** + * (Optional) When true, the consumer will pause on error and stop processing messages. + * When false (default), the consumer will skip failed messages and continue processing. + * Note: When pauseOnError is false and autoCommit is also false, failed messages will still have their offsets committed. + * Default: false (skip errors for backward compatibility) + */ + pauseOnError?: boolean; }; }>; } @@ -374,6 +397,29 @@ export interface Config { * Default: 5000 */ maxWaitTime?: HumanDuration | string; + + /** + * (Optional) If true, the consumer group will start from the earliest offset when no committed offset is found. + * If false or not specified, it will start from the latest offset. + * Default: undefined (start from latest) + */ + fromBeginning?: boolean; + + /** + * (Optional) Enable auto-commit of offsets. + * When true (default), offsets are automatically committed at regular intervals (at-most-once delivery). + * When false, offsets are only committed after successful message processing (at-least-once delivery). + * Default: true (auto-commit enabled for backward compatibility) + */ + autoCommit?: boolean; + + /** + * (Optional) When true, the consumer will pause on error and stop processing messages. + * When false (default), the consumer will skip failed messages and continue processing. + * Note: When pauseOnError is false and autoCommit is also false, failed messages will still have their offsets committed. + * Default: false (skip errors for backward compatibility) + */ + pauseOnError?: boolean; }; }>; }; diff --git a/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/KafkaConsumingEventPublisher.test.ts b/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/KafkaConsumingEventPublisher.test.ts index 98781936b0..b0070166e1 100644 --- a/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/KafkaConsumingEventPublisher.test.ts +++ b/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/KafkaConsumingEventPublisher.test.ts @@ -29,6 +29,7 @@ describe('KafkaConsumingEventPublisher', () => { disconnect: jest.fn(), subscribe: jest.fn(), run: jest.fn(), + commitOffsets: jest.fn(), }; const mockKafkaClient = { @@ -160,4 +161,344 @@ describe('KafkaConsumingEventPublisher', () => { expect(consumers).toHaveLength(1); expect(mockKafkaClient.consumer).toHaveBeenCalledTimes(2); }); + + describe('Offset Management', () => { + it('should commit offset after successful message processing when autoCommit is false', async () => { + const configWithManualCommit = new ConfigReader({ + events: { + modules: { + kafka: { + kafkaConsumingEventPublisher: { + dev: { + clientId: 'backstage-events', + brokers: ['kafka1:9092'], + topics: [ + { + topic: 'backstage-topic', + kafka: { + topics: ['test-topic'], + groupId: 'test-group', + autoCommit: false, + }, + }, + ], + }, + }, + }, + }, + }, + }); + + const consumers = KafkaConsumingEventPublisher.fromConfig({ + config: configWithManualCommit, + events: mockEvents, + logger: mockLogger, + }); + + mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => { + await eachMessage({ + topic: 'test-kafka-topic', + partition: 2, + message: { + key: Buffer.from('test-key'), + value: Buffer.from(JSON.stringify({ data: 'test-data' })), + offset: '12345', + timestamp: '1234567890', + headers: { + 'custom-header': Buffer.from('header-value'), + }, + }, + heartbeat: jest.fn(), + pause: jest.fn(), + }); + }); + + await consumers[0].start(); + + expect(mockEvents.publish).toHaveBeenCalledWith({ + topic: 'backstage-topic', + eventPayload: { data: 'test-data' }, + metadata: { + 'custom-header': 'header-value', + }, + }); + + // Verify offset + 1 + expect(mockConsumer.commitOffsets).toHaveBeenCalledWith([ + { topic: 'test-kafka-topic', partition: 2, offset: '12346' }, + ]); + }); + + it('should not commit offset when autoCommit is true (default)', async () => { + const configWithAutoCommit = new ConfigReader({ + events: { + modules: { + kafka: { + kafkaConsumingEventPublisher: { + dev: { + clientId: 'backstage-events', + brokers: ['kafka1:9092'], + topics: [ + { + topic: 'backstage-topic', + kafka: { + topics: ['test-topic'], + groupId: 'test-group', + autoCommit: true, + }, + }, + ], + }, + }, + }, + }, + }, + }); + + const consumers = KafkaConsumingEventPublisher.fromConfig({ + config: configWithAutoCommit, + events: mockEvents, + logger: mockLogger, + }); + + mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => { + await eachMessage({ + topic: 'test-kafka-topic', + partition: 2, + message: { + value: Buffer.from(JSON.stringify({ data: 'test-data' })), + offset: '12345', + }, + heartbeat: jest.fn(), + pause: jest.fn(), + }); + }); + + await consumers[0].start(); + + expect(mockEvents.publish).toHaveBeenCalled(); + expect(mockConsumer.commitOffsets).not.toHaveBeenCalled(); + }); + + it('should not commit offset when message processing fails and pauseOnError is true', async () => { + const pauseMock = jest.fn(); + const failingEvents = { + ...mockEvents, + publish: jest.fn().mockRejectedValue(new Error('Processing failed')), + }; + + const configWithPauseOnError = new ConfigReader({ + events: { + modules: { + kafka: { + kafkaConsumingEventPublisher: { + dev: { + clientId: 'backstage-events', + brokers: ['kafka1:9092'], + topics: [ + { + topic: 'backstage-topic', + kafka: { + topics: ['test-topic'], + groupId: 'test-group', + autoCommit: false, + pauseOnError: true, + }, + }, + ], + }, + }, + }, + }, + }, + }); + + const consumers = KafkaConsumingEventPublisher.fromConfig({ + config: configWithPauseOnError, + events: failingEvents, + logger: mockLogger, + }); + + mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => { + await expect( + eachMessage({ + topic: 'test-kafka-topic', + partition: 2, + message: { + value: Buffer.from(JSON.stringify({ data: 'test-data' })), + offset: '12345', + }, + heartbeat: jest.fn(), + pause: pauseMock, + }), + ).rejects.toThrow('Processing failed'); + }); + + await consumers[0].start(); + + expect(mockConsumer.commitOffsets).not.toHaveBeenCalled(); + expect(pauseMock).toHaveBeenCalled(); + }); + + it('should skip failed message and commit offset when pauseOnError is false and autoCommit is false', async () => { + const pauseMock = jest.fn(); + const failingEvents = { + ...mockEvents, + publish: jest.fn().mockRejectedValue(new Error('Processing failed')), + }; + + const configWithSkipOnError = new ConfigReader({ + events: { + modules: { + kafka: { + kafkaConsumingEventPublisher: { + dev: { + clientId: 'backstage-events', + brokers: ['kafka1:9092'], + topics: [ + { + topic: 'backstage-topic', + kafka: { + topics: ['test-topic'], + groupId: 'test-group', + autoCommit: false, + pauseOnError: false, + }, + }, + ], + }, + }, + }, + }, + }, + }); + + const consumers = KafkaConsumingEventPublisher.fromConfig({ + config: configWithSkipOnError, + events: failingEvents, + logger: mockLogger, + }); + + mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => { + await eachMessage({ + topic: 'test-kafka-topic', + partition: 2, + message: { + value: Buffer.from(JSON.stringify({ data: 'test-data' })), + offset: '12345', + }, + heartbeat: jest.fn(), + pause: pauseMock, + }); + }); + + await consumers[0].start(); + + // Should commit offset to skip the failed message + expect(mockConsumer.commitOffsets).toHaveBeenCalledWith([ + { topic: 'test-kafka-topic', partition: 2, offset: '12346' }, + ]); + expect(pauseMock).not.toHaveBeenCalled(); + }); + + it('should skip failed message without committing when pauseOnError is false and autoCommit is true', async () => { + const pauseMock = jest.fn(); + const failingEvents = { + ...mockEvents, + publish: jest.fn().mockRejectedValue(new Error('Processing failed')), + }; + + const configWithAutoCommitSkipOnError = new ConfigReader({ + events: { + modules: { + kafka: { + kafkaConsumingEventPublisher: { + dev: { + clientId: 'backstage-events', + brokers: ['kafka1:9092'], + topics: [ + { + topic: 'backstage-topic', + kafka: { + topics: ['test-topic'], + groupId: 'test-group', + autoCommit: true, + pauseOnError: false, + }, + }, + ], + }, + }, + }, + }, + }, + }); + + const consumers = KafkaConsumingEventPublisher.fromConfig({ + config: configWithAutoCommitSkipOnError, + events: failingEvents, + logger: mockLogger, + }); + + mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => { + await eachMessage({ + topic: 'test-kafka-topic', + partition: 2, + message: { + value: Buffer.from(JSON.stringify({ data: 'test-data' })), + offset: '12345', + }, + heartbeat: jest.fn(), + pause: pauseMock, + }); + }); + + await consumers[0].start(); + + // Should not commit offset (autoCommit handles it) + expect(mockConsumer.commitOffsets).not.toHaveBeenCalled(); + expect(pauseMock).not.toHaveBeenCalled(); + }); + + it('should pass autoCommit setting to consumer.run()', async () => { + const configWithManualCommit = new ConfigReader({ + events: { + modules: { + kafka: { + kafkaConsumingEventPublisher: { + dev: { + clientId: 'backstage-events', + brokers: ['kafka1:9092'], + topics: [ + { + topic: 'backstage-topic', + kafka: { + topics: ['test-topic'], + groupId: 'test-group', + autoCommit: false, + }, + }, + ], + }, + }, + }, + }, + }, + }); + + const consumers = KafkaConsumingEventPublisher.fromConfig({ + config: configWithManualCommit, + events: mockEvents, + logger: mockLogger, + }); + + await consumers[0].start(); + + expect(mockConsumer.run).toHaveBeenCalledWith( + expect.objectContaining({ + autoCommit: false, + }), + ); + }); + }); }); diff --git a/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/KafkaConsumingEventPublisher.ts b/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/KafkaConsumingEventPublisher.ts index b996032575..cfb9af8a4c 100644 --- a/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/KafkaConsumingEventPublisher.ts +++ b/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/KafkaConsumingEventPublisher.ts @@ -86,12 +86,55 @@ export class KafkaConsumingEventPublisher { await consumer.subscribe(config.consumerSubscribeTopics); await consumer.run({ - eachMessage: async ({ message }) => { - this.events.publish({ - topic: config.backstageTopic, - eventPayload: JSON.parse(message.value?.toString()!), - metadata: convertHeadersToMetadata(message.headers), - }); + autoCommit: config.autoCommit, + eachMessage: async ({ + topic, + partition, + message, + heartbeat, + pause, + }) => { + try { + await this.events.publish({ + topic: config.backstageTopic, + eventPayload: JSON.parse(message.value?.toString()!), + metadata: convertHeadersToMetadata(message.headers), + }); + + // Only commit offset manually if autoCommit is disabled + if (!config.autoCommit) { + await consumer.commitOffsets([ + { + topic, + partition, + offset: (parseInt(message.offset, 10) + 1).toString(), + }, + ]); + } + + await heartbeat(); + } catch (error: any) { + consumerLogger.error( + `Failed to process message at offset ${message.offset} on partition ${partition} of topic ${topic}`, + error, + ); + + if (config.pauseOnError) { + pause(); + throw error; + } + + // Skip the failed message by committing its offset if autoCommit is disabled + if (!config.autoCommit) { + await consumer.commitOffsets([ + { + topic, + partition, + offset: (parseInt(message.offset, 10) + 1).toString(), + }, + ]); + } + } }, }); } catch (error: any) { diff --git a/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/config.test.ts b/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/config.test.ts index 16ee9e0a4c..0f7bff4dbe 100644 --- a/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/config.test.ts +++ b/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/config.test.ts @@ -86,6 +86,8 @@ describe('readConsumerConfig', () => { consumerSubscribeTopics: { topics: ['topic-A'], }, + autoCommit: true, + pauseOnError: false, }, { backstageTopic: 'fake2', @@ -95,6 +97,8 @@ describe('readConsumerConfig', () => { consumerSubscribeTopics: { topics: ['topic-B'], }, + autoCommit: true, + pauseOnError: false, }, ]); }); @@ -209,6 +213,8 @@ describe('readConsumerConfig', () => { consumerSubscribeTopics: { topics: ['topic-A'], }, + autoCommit: true, + pauseOnError: false, }, { backstageTopic: 'fake2', @@ -218,6 +224,8 @@ describe('readConsumerConfig', () => { consumerSubscribeTopics: { topics: ['topic-B'], }, + autoCommit: true, + pauseOnError: false, }, ]); }); @@ -334,6 +342,8 @@ describe('readConsumerConfig', () => { consumerSubscribeTopics: { topics: ['topic-A'], }, + autoCommit: true, + pauseOnError: false, }, { backstageTopic: 'fake2', @@ -343,6 +353,8 @@ describe('readConsumerConfig', () => { consumerSubscribeTopics: { topics: ['topic-B'], }, + autoCommit: true, + pauseOnError: false, }, ]); @@ -351,4 +363,54 @@ describe('readConsumerConfig', () => { 'Legacy single config format detected at events.modules.kafka.kafkaConsumingEventPublisher.', ); }); + + it('offset management fields (autoCommit, pauseOnError, fromBeginning)', () => { + const config = new ConfigReader({ + events: { + modules: { + kafka: { + kafkaConsumingEventPublisher: { + dev: { + clientId: 'backstage-events', + brokers: ['kafka1:9092'], + topics: [ + { + topic: 'fake1', + kafka: { + topics: ['topic-A'], + groupId: 'my-group', + autoCommit: false, + pauseOnError: true, + fromBeginning: true, + }, + }, + ], + }, + }, + }, + }, + }, + }); + + const publisherConfigs = readConsumerConfig(config, mockLogger); + + expect(publisherConfigs).toBeDefined(); + expect(publisherConfigs).toHaveLength(1); + + const devConfig = publisherConfigs[0]; + expect(devConfig.kafkaConsumerConfigs).toEqual([ + { + backstageTopic: 'fake1', + consumerConfig: { + groupId: 'my-group', + }, + consumerSubscribeTopics: { + topics: ['topic-A'], + fromBeginning: true, + }, + autoCommit: false, + pauseOnError: true, + }, + ]); + }); }); diff --git a/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/config.ts b/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/config.ts index b07378a1d6..327bc57255 100644 --- a/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/config.ts +++ b/plugins/events-backend-module-kafka/src/KafkaConsumingEventPublisher/config.ts @@ -25,6 +25,8 @@ export interface KafkaConsumerConfig { backstageTopic: string; consumerConfig: ConsumerConfig; consumerSubscribeTopics: ConsumerSubscribeTopics; + autoCommit: boolean; + pauseOnError: boolean; } export interface KafkaConsumingEventPublisherConfig { @@ -78,7 +80,16 @@ const processSinglePublisher = ( }, consumerSubscribeTopics: { topics: topicConfig.getStringArray('kafka.topics'), + fromBeginning: topicConfig.getOptionalBoolean( + 'kafka.fromBeginning', + ), }, + // Default autoCommit to true to match KafkaJS default and ensure consistency + // between KafkaJS's auto-commit behavior and our manual commit logic + autoCommit: + topicConfig.getOptionalBoolean('kafka.autoCommit') ?? true, + pauseOnError: + topicConfig.getOptionalBoolean('kafka.pauseOnError') ?? false, }; }), };