add support for Kafka offset configuration (fromBeginning) and autoCommit to plugin-events-backend-module

Signed-off-by: Dominik Bartholdi <domi@fortysix.ch>
This commit is contained in:
Dominik Bartholdi
2025-11-24 07:10:20 +01:00
committed by Dominik Bartholdi
parent 06e1270eaa
commit ef5bbd8217
7 changed files with 589 additions and 6 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-events-backend-module-kafka': minor
---
Add support for Kafka offset configuration (`fromBeginning`) and `autoCommit`
@@ -33,6 +33,10 @@ events:
topics: # (Required) The Kafka topics to subscribe to.
- topic1
groupId: your-group-id # (Required) The GroupId to be used by the topic consumers.
# Optional offset management settings (these can be omitted to use defaults):
# fromBeginning: false # Start from earliest offset when no committed offset exists. Default: not set (latest)
# autoCommit: true # Enable auto-commit. Default: true (for backward compatibility)
# pauseOnError: false # Pause consumer on error. Default: false (for backward compatibility)
```
### KafkaPublishingEventConsumer Configuration
@@ -57,6 +61,77 @@ events:
For a complete list of all available fields that can be configured, refer to the [config.d.ts file](./config.d.ts).
### Offset Management
The plugin supports configurable offset management to control message delivery semantics:
#### Auto Commit (Default - Backward Compatible)
By default (`autoCommit: true` or not specified), Kafka automatically commits offsets at regular intervals. This is the original behavior and ensures backward compatibility.
#### Manual Commit (Opt-in for Reliability)
When you explicitly set `autoCommit: false`, the plugin will:
1. Start consuming from the last committed offset for the consumer group
2. Process each message by publishing it to the Backstage events system
3. Only commit the offset after successful processing
4. If processing fails, pause the consumer and do not commit the offset
**Example configuration for manual commit:**
```yaml
kafka:
topics:
- topic1
groupId: my-group
autoCommit: false # Enable manual commit
```
#### Error Handling
The `pauseOnError` option controls how the consumer behaves when message processing fails:
**Skip Failed Messages (Default - Backward Compatible)**
By default (`pauseOnError: false` or not specified), the consumer will skip failed messages and continue processing:
- The consumer logs the error but continues processing subsequent messages
- If `autoCommit: false`, the offset is still committed to skip the failed message
- If `autoCommit: true`, Kafka's auto-commit handles the offset
- Recommended when occasional message failures are acceptable and should not block processing
**Pause on Error (Opt-in)**
When you explicitly set `pauseOnError: true`, the consumer will pause when an error occurs during message processing:
- The consumer pauses and stops processing new messages
- The failed message offset is not committed
- The error is re-thrown and logged
- Recommended when you want to investigate and fix issues before continuing
**Example configuration to pause on error:**
```yaml
kafka:
topics:
- topic1
groupId: my-group
autoCommit: false
pauseOnError: true # Pause consumer when a message fails
```
**Note:** When using the default behavior (`pauseOnError: false`) with `autoCommit: false`, failed messages will have their offsets committed, meaning they will be skipped and not reprocessed. Use this configuration carefully based on your application's requirements.
#### Starting Position
The `fromBeginning` option controls where the consumer starts when no committed offset exists:
- `fromBeginning: true` - Start from the earliest available message
- `fromBeginning: false` (default) - Start from the latest message (only new messages)
Once the consumer group has committed an offset, it will always resume from that position, regardless of the `fromBeginning` setting.
### Optional SSL Configuration
If your Kafka cluster requires SSL, you can configure it for both `kafkaConsumingEventPublisher` and `kafkaPublishingEventConsumer` instances:
+46
View File
@@ -200,6 +200,29 @@ export interface Config {
* Default: 5000
*/
maxWaitTime?: HumanDuration | string;
/**
* (Optional) If true, the consumer group will start from the earliest offset when no committed offset is found.
* If false or not specified, it will start from the latest offset.
* Default: undefined (start from latest)
*/
fromBeginning?: boolean;
/**
* (Optional) Enable auto-commit of offsets.
* When true (default), offsets are automatically committed at regular intervals (at-most-once delivery).
* When false, offsets are only committed after successful message processing (at-least-once delivery).
* Default: true (auto-commit enabled for backward compatibility)
*/
autoCommit?: boolean;
/**
* (Optional) When true, the consumer will pause on error and stop processing messages.
* When false (default), the consumer will skip failed messages and continue processing.
* Note: When pauseOnError is false and autoCommit is also false, failed messages will still have their offsets committed.
* Default: false (skip errors for backward compatibility)
*/
pauseOnError?: boolean;
};
}>;
}
@@ -374,6 +397,29 @@ export interface Config {
* Default: 5000
*/
maxWaitTime?: HumanDuration | string;
/**
* (Optional) If true, the consumer group will start from the earliest offset when no committed offset is found.
* If false or not specified, it will start from the latest offset.
* Default: undefined (start from latest)
*/
fromBeginning?: boolean;
/**
* (Optional) Enable auto-commit of offsets.
* When true (default), offsets are automatically committed at regular intervals (at-most-once delivery).
* When false, offsets are only committed after successful message processing (at-least-once delivery).
* Default: true (auto-commit enabled for backward compatibility)
*/
autoCommit?: boolean;
/**
* (Optional) When true, the consumer will pause on error and stop processing messages.
* When false (default), the consumer will skip failed messages and continue processing.
* Note: When pauseOnError is false and autoCommit is also false, failed messages will still have their offsets committed.
* Default: false (skip errors for backward compatibility)
*/
pauseOnError?: boolean;
};
}>;
};
@@ -29,6 +29,7 @@ describe('KafkaConsumingEventPublisher', () => {
disconnect: jest.fn(),
subscribe: jest.fn(),
run: jest.fn(),
commitOffsets: jest.fn(),
};
const mockKafkaClient = {
@@ -160,4 +161,344 @@ describe('KafkaConsumingEventPublisher', () => {
expect(consumers).toHaveLength(1);
expect(mockKafkaClient.consumer).toHaveBeenCalledTimes(2);
});
describe('Offset Management', () => {
it('should commit offset after successful message processing when autoCommit is false', async () => {
const configWithManualCommit = new ConfigReader({
events: {
modules: {
kafka: {
kafkaConsumingEventPublisher: {
dev: {
clientId: 'backstage-events',
brokers: ['kafka1:9092'],
topics: [
{
topic: 'backstage-topic',
kafka: {
topics: ['test-topic'],
groupId: 'test-group',
autoCommit: false,
},
},
],
},
},
},
},
},
});
const consumers = KafkaConsumingEventPublisher.fromConfig({
config: configWithManualCommit,
events: mockEvents,
logger: mockLogger,
});
mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => {
await eachMessage({
topic: 'test-kafka-topic',
partition: 2,
message: {
key: Buffer.from('test-key'),
value: Buffer.from(JSON.stringify({ data: 'test-data' })),
offset: '12345',
timestamp: '1234567890',
headers: {
'custom-header': Buffer.from('header-value'),
},
},
heartbeat: jest.fn(),
pause: jest.fn(),
});
});
await consumers[0].start();
expect(mockEvents.publish).toHaveBeenCalledWith({
topic: 'backstage-topic',
eventPayload: { data: 'test-data' },
metadata: {
'custom-header': 'header-value',
},
});
// Verify offset + 1
expect(mockConsumer.commitOffsets).toHaveBeenCalledWith([
{ topic: 'test-kafka-topic', partition: 2, offset: '12346' },
]);
});
it('should not commit offset when autoCommit is true (default)', async () => {
const configWithAutoCommit = new ConfigReader({
events: {
modules: {
kafka: {
kafkaConsumingEventPublisher: {
dev: {
clientId: 'backstage-events',
brokers: ['kafka1:9092'],
topics: [
{
topic: 'backstage-topic',
kafka: {
topics: ['test-topic'],
groupId: 'test-group',
autoCommit: true,
},
},
],
},
},
},
},
},
});
const consumers = KafkaConsumingEventPublisher.fromConfig({
config: configWithAutoCommit,
events: mockEvents,
logger: mockLogger,
});
mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => {
await eachMessage({
topic: 'test-kafka-topic',
partition: 2,
message: {
value: Buffer.from(JSON.stringify({ data: 'test-data' })),
offset: '12345',
},
heartbeat: jest.fn(),
pause: jest.fn(),
});
});
await consumers[0].start();
expect(mockEvents.publish).toHaveBeenCalled();
expect(mockConsumer.commitOffsets).not.toHaveBeenCalled();
});
it('should not commit offset when message processing fails and pauseOnError is true', async () => {
const pauseMock = jest.fn();
const failingEvents = {
...mockEvents,
publish: jest.fn().mockRejectedValue(new Error('Processing failed')),
};
const configWithPauseOnError = new ConfigReader({
events: {
modules: {
kafka: {
kafkaConsumingEventPublisher: {
dev: {
clientId: 'backstage-events',
brokers: ['kafka1:9092'],
topics: [
{
topic: 'backstage-topic',
kafka: {
topics: ['test-topic'],
groupId: 'test-group',
autoCommit: false,
pauseOnError: true,
},
},
],
},
},
},
},
},
});
const consumers = KafkaConsumingEventPublisher.fromConfig({
config: configWithPauseOnError,
events: failingEvents,
logger: mockLogger,
});
mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => {
await expect(
eachMessage({
topic: 'test-kafka-topic',
partition: 2,
message: {
value: Buffer.from(JSON.stringify({ data: 'test-data' })),
offset: '12345',
},
heartbeat: jest.fn(),
pause: pauseMock,
}),
).rejects.toThrow('Processing failed');
});
await consumers[0].start();
expect(mockConsumer.commitOffsets).not.toHaveBeenCalled();
expect(pauseMock).toHaveBeenCalled();
});
it('should skip failed message and commit offset when pauseOnError is false and autoCommit is false', async () => {
const pauseMock = jest.fn();
const failingEvents = {
...mockEvents,
publish: jest.fn().mockRejectedValue(new Error('Processing failed')),
};
const configWithSkipOnError = new ConfigReader({
events: {
modules: {
kafka: {
kafkaConsumingEventPublisher: {
dev: {
clientId: 'backstage-events',
brokers: ['kafka1:9092'],
topics: [
{
topic: 'backstage-topic',
kafka: {
topics: ['test-topic'],
groupId: 'test-group',
autoCommit: false,
pauseOnError: false,
},
},
],
},
},
},
},
},
});
const consumers = KafkaConsumingEventPublisher.fromConfig({
config: configWithSkipOnError,
events: failingEvents,
logger: mockLogger,
});
mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => {
await eachMessage({
topic: 'test-kafka-topic',
partition: 2,
message: {
value: Buffer.from(JSON.stringify({ data: 'test-data' })),
offset: '12345',
},
heartbeat: jest.fn(),
pause: pauseMock,
});
});
await consumers[0].start();
// Should commit offset to skip the failed message
expect(mockConsumer.commitOffsets).toHaveBeenCalledWith([
{ topic: 'test-kafka-topic', partition: 2, offset: '12346' },
]);
expect(pauseMock).not.toHaveBeenCalled();
});
it('should skip failed message without committing when pauseOnError is false and autoCommit is true', async () => {
const pauseMock = jest.fn();
const failingEvents = {
...mockEvents,
publish: jest.fn().mockRejectedValue(new Error('Processing failed')),
};
const configWithAutoCommitSkipOnError = new ConfigReader({
events: {
modules: {
kafka: {
kafkaConsumingEventPublisher: {
dev: {
clientId: 'backstage-events',
brokers: ['kafka1:9092'],
topics: [
{
topic: 'backstage-topic',
kafka: {
topics: ['test-topic'],
groupId: 'test-group',
autoCommit: true,
pauseOnError: false,
},
},
],
},
},
},
},
},
});
const consumers = KafkaConsumingEventPublisher.fromConfig({
config: configWithAutoCommitSkipOnError,
events: failingEvents,
logger: mockLogger,
});
mockConsumer.run.mockImplementation(async ({ eachMessage }: any) => {
await eachMessage({
topic: 'test-kafka-topic',
partition: 2,
message: {
value: Buffer.from(JSON.stringify({ data: 'test-data' })),
offset: '12345',
},
heartbeat: jest.fn(),
pause: pauseMock,
});
});
await consumers[0].start();
// Should not commit offset (autoCommit handles it)
expect(mockConsumer.commitOffsets).not.toHaveBeenCalled();
expect(pauseMock).not.toHaveBeenCalled();
});
it('should pass autoCommit setting to consumer.run()', async () => {
const configWithManualCommit = new ConfigReader({
events: {
modules: {
kafka: {
kafkaConsumingEventPublisher: {
dev: {
clientId: 'backstage-events',
brokers: ['kafka1:9092'],
topics: [
{
topic: 'backstage-topic',
kafka: {
topics: ['test-topic'],
groupId: 'test-group',
autoCommit: false,
},
},
],
},
},
},
},
},
});
const consumers = KafkaConsumingEventPublisher.fromConfig({
config: configWithManualCommit,
events: mockEvents,
logger: mockLogger,
});
await consumers[0].start();
expect(mockConsumer.run).toHaveBeenCalledWith(
expect.objectContaining({
autoCommit: false,
}),
);
});
});
});
@@ -86,12 +86,55 @@ export class KafkaConsumingEventPublisher {
await consumer.subscribe(config.consumerSubscribeTopics);
await consumer.run({
eachMessage: async ({ message }) => {
this.events.publish({
topic: config.backstageTopic,
eventPayload: JSON.parse(message.value?.toString()!),
metadata: convertHeadersToMetadata(message.headers),
});
autoCommit: config.autoCommit,
eachMessage: async ({
topic,
partition,
message,
heartbeat,
pause,
}) => {
try {
await this.events.publish({
topic: config.backstageTopic,
eventPayload: JSON.parse(message.value?.toString()!),
metadata: convertHeadersToMetadata(message.headers),
});
// Only commit offset manually if autoCommit is disabled
if (!config.autoCommit) {
await consumer.commitOffsets([
{
topic,
partition,
offset: (parseInt(message.offset, 10) + 1).toString(),
},
]);
}
await heartbeat();
} catch (error: any) {
consumerLogger.error(
`Failed to process message at offset ${message.offset} on partition ${partition} of topic ${topic}`,
error,
);
if (config.pauseOnError) {
pause();
throw error;
}
// Skip the failed message by committing its offset if autoCommit is disabled
if (!config.autoCommit) {
await consumer.commitOffsets([
{
topic,
partition,
offset: (parseInt(message.offset, 10) + 1).toString(),
},
]);
}
}
},
});
} catch (error: any) {
@@ -86,6 +86,8 @@ describe('readConsumerConfig', () => {
consumerSubscribeTopics: {
topics: ['topic-A'],
},
autoCommit: true,
pauseOnError: false,
},
{
backstageTopic: 'fake2',
@@ -95,6 +97,8 @@ describe('readConsumerConfig', () => {
consumerSubscribeTopics: {
topics: ['topic-B'],
},
autoCommit: true,
pauseOnError: false,
},
]);
});
@@ -209,6 +213,8 @@ describe('readConsumerConfig', () => {
consumerSubscribeTopics: {
topics: ['topic-A'],
},
autoCommit: true,
pauseOnError: false,
},
{
backstageTopic: 'fake2',
@@ -218,6 +224,8 @@ describe('readConsumerConfig', () => {
consumerSubscribeTopics: {
topics: ['topic-B'],
},
autoCommit: true,
pauseOnError: false,
},
]);
});
@@ -334,6 +342,8 @@ describe('readConsumerConfig', () => {
consumerSubscribeTopics: {
topics: ['topic-A'],
},
autoCommit: true,
pauseOnError: false,
},
{
backstageTopic: 'fake2',
@@ -343,6 +353,8 @@ describe('readConsumerConfig', () => {
consumerSubscribeTopics: {
topics: ['topic-B'],
},
autoCommit: true,
pauseOnError: false,
},
]);
@@ -351,4 +363,54 @@ describe('readConsumerConfig', () => {
'Legacy single config format detected at events.modules.kafka.kafkaConsumingEventPublisher.',
);
});
it('offset management fields (autoCommit, pauseOnError, fromBeginning)', () => {
const config = new ConfigReader({
events: {
modules: {
kafka: {
kafkaConsumingEventPublisher: {
dev: {
clientId: 'backstage-events',
brokers: ['kafka1:9092'],
topics: [
{
topic: 'fake1',
kafka: {
topics: ['topic-A'],
groupId: 'my-group',
autoCommit: false,
pauseOnError: true,
fromBeginning: true,
},
},
],
},
},
},
},
},
});
const publisherConfigs = readConsumerConfig(config, mockLogger);
expect(publisherConfigs).toBeDefined();
expect(publisherConfigs).toHaveLength(1);
const devConfig = publisherConfigs[0];
expect(devConfig.kafkaConsumerConfigs).toEqual([
{
backstageTopic: 'fake1',
consumerConfig: {
groupId: 'my-group',
},
consumerSubscribeTopics: {
topics: ['topic-A'],
fromBeginning: true,
},
autoCommit: false,
pauseOnError: true,
},
]);
});
});
@@ -25,6 +25,8 @@ export interface KafkaConsumerConfig {
backstageTopic: string;
consumerConfig: ConsumerConfig;
consumerSubscribeTopics: ConsumerSubscribeTopics;
autoCommit: boolean;
pauseOnError: boolean;
}
export interface KafkaConsumingEventPublisherConfig {
@@ -78,7 +80,16 @@ const processSinglePublisher = (
},
consumerSubscribeTopics: {
topics: topicConfig.getStringArray('kafka.topics'),
fromBeginning: topicConfig.getOptionalBoolean(
'kafka.fromBeginning',
),
},
// Default autoCommit to true to match KafkaJS default and ensure consistency
// between KafkaJS's auto-commit behavior and our manual commit logic
autoCommit:
topicConfig.getOptionalBoolean('kafka.autoCommit') ?? true,
pauseOnError:
topicConfig.getOptionalBoolean('kafka.pauseOnError') ?? false,
};
}),
};