-
Notifications
You must be signed in to change notification settings - Fork 340
Description
Details:
I'm using the Kafka Connect S3 Sink Connector (Confluent 5.3.1) to write data to S3. I'm facing an intermittent issue where some records are not flushed to S3 even though they are present in the Kafka topic.
In my case, I am specifically observing these issues where a key is set—either via a transform (createKey,extractInt) or when using Debezium, which sets the key to the table's primary key. When no key is set, flushing works as expected.
One example, two records (from different partitions) were produced around the same time, both with primary key fields as keys. One made it to S3, the other didn't, despite the consumer group offset having advanced past the missing record.
Configuration of S3 Sink, which I am using:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"partition.duration.ms": "3600000",
"flush.size": "100000",
"tasks.max": "5",
"topics": "topic",
"s3.part.size": "5242880",
"timezone": "Etc/UTC",
"locale": "en",
"s3.compression.type": "gzip",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"name": "name",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "'time'=YYYYMMddHH",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"s3.bucket.name": "s3_bucket",
"rotate.schedule.interval.ms": "1800000"
}
Record (Missing from S3):
Partition: 4
Key: {"ID":2403563} // set by Debezium as PK
Value: {...}
CreateTime: 2025-06-02T01:41:17Z
Record (Properly flushed to S3):
Partition: 2
Key: {"ID":2384617}
Value: {...}
CreateTime: 2025-06-02T01:40:39Z
As this is an intermittent issue, I am unable to replicate it either, but I am facing it in the production setup. I checked the logs too, LogLevel set at ERROR, but didn't find anything related to this issue.
Could this be related to any existing offset commit race condition or rebalance issue (like known issues in the S3 Sink)?
Do you have any idea how to prevent silent data loss in such scenarios?