Skip to content

Commit b8e5ba8

Browse files
FLOW-7464 Direct mapping from SinkRecord to Map
1 parent b31c9b8 commit b8e5ba8

37 files changed

+1834
-2507
lines changed

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,7 @@ public enum SnowflakeErrors {
210210
"Failed to put records",
211211
"SinkTask hasn't been initialized before calling PUT function"),
212212
ERROR_5015(
213-
"5015",
214-
"Invalid SinkRecord received",
215-
"Error parsing SinkRecord of native converter or SinkRecord header"),
213+
"5015", "Invalid SinkRecord received", "Error parsing SinkRecord value or SinkRecord header"),
216214
ERROR_5020("5020", "Failed to register MBean in MbeanServer", "Object Name is invalid"),
217215
ERROR_5022("5022", "Invalid column name", "Failed to find column in the schema"),
218216

src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static com.google.common.base.Strings.isNullOrEmpty;
44
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.NAME;
55
import static com.snowflake.kafka.connector.Utils.getTableName;
6-
import static com.snowflake.kafka.connector.Utils.isIcebergEnabled;
76
import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
87
import static com.snowflake.kafka.connector.internal.streaming.v2.PipeNameProvider.buildDefaultPipeName;
98
import static com.snowflake.kafka.connector.internal.streaming.v2.PipeNameProvider.buildPipeName;
@@ -22,8 +21,7 @@
2221
import com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel;
2322
import com.snowflake.kafka.connector.internal.streaming.v2.SnowpipeStreamingPartitionChannel;
2423
import com.snowflake.kafka.connector.internal.streaming.v2.StreamingClientManager;
25-
import com.snowflake.kafka.connector.records.RecordService;
26-
import com.snowflake.kafka.connector.records.RecordServiceFactory;
24+
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
2725
import java.util.Collection;
2826
import java.util.HashMap;
2927
import java.util.HashSet;
@@ -58,7 +56,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
5856
// Used to connect to Snowflake, could be null during testing
5957
private final SnowflakeConnectionService conn;
6058

61-
private final RecordService recordService;
59+
private final SnowflakeMetadataConfig metadataConfig;
6260

6361
private final Map<String, String> topicToTableMap;
6462

@@ -101,8 +99,7 @@ public SnowflakeSinkServiceV2(
10199
this.sinkTaskContext = sinkTaskContext;
102100
this.enableCustomJMXMonitoring = enableCustomJMXMonitoring;
103101
this.topicToTableMap = topicToTableMap;
104-
this.recordService =
105-
RecordServiceFactory.createRecordService(isIcebergEnabled(connectorConfig));
102+
this.metadataConfig = new SnowflakeMetadataConfig(connectorConfig);
106103
this.behaviorOnNullValues = behaviorOnNullValues;
107104
this.partitionsToChannel = new HashMap<>();
108105

@@ -214,7 +211,7 @@ private void createStreamingChannelForTopicPartition(
214211
channelName);
215212

216213
StreamingRecordService streamingRecordService =
217-
new StreamingRecordService(this.recordService, this.kafkaRecordErrorReporter);
214+
new StreamingRecordService(this.kafkaRecordErrorReporter, this.metadataConfig);
218215

219216
StreamingErrorHandler streamingErrorHandler =
220217
new StreamingErrorHandler(
@@ -269,7 +266,7 @@ public void insert(final Collection<SinkRecord> records) {
269266
channelsVisitedPerBatch.clear();
270267
for (SinkRecord record : records) {
271268
// check if it needs to handle null value records
272-
if (recordService.shouldSkipNullValue(record, behaviorOnNullValues)) {
269+
if (shouldSkipNullValue(record)) {
273270
continue;
274271
}
275272

@@ -305,6 +302,21 @@ public void insert(SinkRecord record) {
305302
channelPartition.insertRecord(record, isFirstRowPerPartitionInBatch);
306303
}
307304

305+
private boolean shouldSkipNullValue(SinkRecord record) {
306+
if (behaviorOnNullValues == ConnectorConfigTools.BehaviorOnNullValues.DEFAULT) {
307+
return false;
308+
}
309+
if (record.value() == null) {
310+
LOGGER.debug(
311+
"Null valued record from topic '{}', partition {} and offset {} was skipped.",
312+
record.topic(),
313+
record.kafkaPartition(),
314+
record.kafkaOffset());
315+
return true;
316+
}
317+
return false;
318+
}
319+
308320
@Override
309321
public long getOffset(TopicPartition topicPartition) {
310322
String partitionChannelKey =
Lines changed: 17 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -1,160 +1,44 @@
11
package com.snowflake.kafka.connector.internal.streaming;
22

3-
import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;
4-
5-
import com.fasterxml.jackson.core.JsonProcessingException;
6-
import com.google.common.collect.ImmutableMap;
73
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
84
import com.snowflake.kafka.connector.internal.KCLogger;
9-
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
10-
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
11-
import com.snowflake.kafka.connector.records.RecordService;
12-
import com.snowflake.kafka.connector.records.SnowflakeJsonSchema;
13-
import com.snowflake.kafka.connector.records.SnowflakeRecordContent;
14-
import java.io.ByteArrayOutputStream;
15-
import java.io.ObjectOutputStream;
5+
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
6+
import com.snowflake.kafka.connector.records.SnowflakeSinkRecord;
167
import java.util.Map;
17-
import org.apache.kafka.connect.data.Schema;
188
import org.apache.kafka.connect.errors.DataException;
199
import org.apache.kafka.connect.sink.SinkRecord;
2010

21-
/** Service to transform data from Kafka format into a map that is accepted by ingest sdk. */
11+
/**
12+
* Service to transform data from Kafka format into a map that is accepted by the Snowflake
13+
* Streaming Ingest SDK.
14+
*/
2215
public class StreamingRecordService {
16+
2317
private static final KCLogger LOGGER = new KCLogger(StreamingRecordService.class.getName());
2418

25-
private final RecordService recordService;
2619
private final KafkaRecordErrorReporter kafkaRecordErrorReporter;
20+
private final SnowflakeMetadataConfig metadataConfig;
2721

2822
public StreamingRecordService(
29-
RecordService recordService, KafkaRecordErrorReporter kafkaRecordErrorReporter) {
30-
this.recordService = recordService;
23+
KafkaRecordErrorReporter kafkaRecordErrorReporter, SnowflakeMetadataConfig metadataConfig) {
3124
this.kafkaRecordErrorReporter = kafkaRecordErrorReporter;
25+
this.metadataConfig = metadataConfig;
3226
}
3327

34-
/**
35-
* @param kafkaSinkRecord a record in Kafka format
36-
* @return a map that format depends on the schematization settings
37-
*/
3828
public Map<String, Object> transformData(SinkRecord kafkaSinkRecord) {
39-
SinkRecord snowflakeSinkRecord = getSnowflakeSinkRecordFromKafkaRecord(kafkaSinkRecord);
40-
// broken record
41-
if (isRecordBroken(snowflakeSinkRecord)) {
42-
// check for error tolerance and log tolerance values
43-
// errors.log.enable and errors.tolerance
29+
SnowflakeSinkRecord record = SnowflakeSinkRecord.from(kafkaSinkRecord, metadataConfig);
30+
31+
if (record.isBroken()) {
4432
LOGGER.debug(
4533
"Broken record offset:{}, topic:{}",
4634
kafkaSinkRecord.kafkaOffset(),
4735
kafkaSinkRecord.topic());
4836
kafkaRecordErrorReporter.reportError(kafkaSinkRecord, new DataException("Broken Record"));
49-
} else {
50-
// lag telemetry, note that sink record timestamp might be null
51-
if (kafkaSinkRecord.timestamp() != null
52-
&& kafkaSinkRecord.timestampType() != NO_TIMESTAMP_TYPE) {
53-
// TODO:SNOW-529751 telemetry
54-
}
55-
56-
// Convert this records into Json Schema which has content and metadata, add it to DLQ if
57-
// there is an exception
58-
try {
59-
return recordService.getProcessedRecordForStreamingIngest(snowflakeSinkRecord);
60-
} catch (JsonProcessingException e) {
61-
LOGGER.warn(
62-
"Record has JsonProcessingException offset:{}, topic:{}",
63-
kafkaSinkRecord.kafkaOffset(),
64-
kafkaSinkRecord.topic());
65-
kafkaRecordErrorReporter.reportError(kafkaSinkRecord, e);
66-
} catch (SnowflakeKafkaConnectorException e) {
67-
if (e.checkErrorCode(SnowflakeErrors.ERROR_0010)) {
68-
LOGGER.warn(
69-
"Cannot parse record offset:{}, topic:{}. Sending to DLQ.",
70-
kafkaSinkRecord.kafkaOffset(),
71-
kafkaSinkRecord.topic());
72-
kafkaRecordErrorReporter.reportError(kafkaSinkRecord, e);
73-
} else {
74-
throw e;
75-
}
76-
}
77-
}
78-
79-
// return empty
80-
return ImmutableMap.of();
81-
}
82-
83-
/**
84-
* Converts the original kafka sink record into a Json Record. i.e key and values are converted
85-
* into Json so that it can be used to insert into variant column of Snowflake Table.
86-
*
87-
* <p>TODO: SNOW-630885 - When schematization is enabled, we should create the map directly from
88-
* the SinkRecord instead of first turning it into json
89-
*/
90-
private SinkRecord getSnowflakeSinkRecordFromKafkaRecord(final SinkRecord kafkaSinkRecord) {
91-
SinkRecord snowflakeRecord = kafkaSinkRecord;
92-
if (shouldConvertContent(kafkaSinkRecord.value())) {
93-
snowflakeRecord = handleNativeRecord(kafkaSinkRecord, false);
94-
}
95-
if (shouldConvertContent(kafkaSinkRecord.key())) {
96-
snowflakeRecord = handleNativeRecord(snowflakeRecord, true);
37+
return Map.of();
9738
}
9839

99-
return snowflakeRecord;
100-
}
101-
102-
private boolean shouldConvertContent(final Object content) {
103-
return content != null && !(content instanceof SnowflakeRecordContent);
104-
}
105-
106-
/**
107-
* This would always return false for streaming ingest use case since isBroken field is never set.
108-
* isBroken is set only when using Custom snowflake converters and the content was not json
109-
* serializable.
110-
*
111-
* <p>For Community converters, the kafka record will not be sent to Kafka connector if the record
112-
* is not serializable.
113-
*/
114-
private boolean isRecordBroken(final SinkRecord record) {
115-
return isContentBroken(record.value()) || isContentBroken(record.key());
116-
}
117-
118-
private boolean isContentBroken(final Object content) {
119-
return content != null && ((SnowflakeRecordContent) content).isBroken();
120-
}
121-
122-
private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) {
123-
SnowflakeRecordContent newSFContent;
124-
Schema schema = isKey ? record.keySchema() : record.valueSchema();
125-
Object content = isKey ? record.key() : record.value();
126-
try {
127-
newSFContent = new SnowflakeRecordContent(schema, content);
128-
} catch (Exception e) {
129-
LOGGER.error("Native content parser error:\n{}", e.getMessage());
130-
try {
131-
// try to serialize this object and send that as broken record
132-
ByteArrayOutputStream out = new ByteArrayOutputStream();
133-
ObjectOutputStream os = new ObjectOutputStream(out);
134-
os.writeObject(content);
135-
newSFContent = new SnowflakeRecordContent(out.toByteArray());
136-
} catch (Exception serializeError) {
137-
LOGGER.error(
138-
"Failed to convert broken native record to byte data:\n{}",
139-
serializeError.getMessage());
140-
throw e;
141-
}
142-
}
143-
// create new sinkRecord
144-
Schema keySchema = isKey ? new SnowflakeJsonSchema() : record.keySchema();
145-
Object keyContent = isKey ? newSFContent : record.key();
146-
Schema valueSchema = isKey ? record.valueSchema() : new SnowflakeJsonSchema();
147-
Object valueContent = isKey ? record.value() : newSFContent;
148-
return new SinkRecord(
149-
record.topic(),
150-
record.kafkaPartition(),
151-
keySchema,
152-
keyContent,
153-
valueSchema,
154-
valueContent,
155-
record.kafkaOffset(),
156-
record.timestamp(),
157-
record.timestampType(),
158-
record.headers());
40+
// Tombstone records are handled by the caller (shouldSkipNullValue check)
41+
// If we reach here, it means we should ingest an empty record
42+
return record.getContentWithMetadata(metadataConfig.shouldIncludeAllMetadata());
15943
}
16044
}

src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnJsonValuePair.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

src/main/java/com/snowflake/kafka/connector/internal/streaming/v2/SnowpipeStreamingPartitionChannel.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,11 @@ private void transformAndSend(SinkRecord kafkaSinkRecord) {
224224
Map<String, Object> transformedRecord = streamingRecordService.transformData(kafkaSinkRecord);
225225
if (!transformedRecord.isEmpty()) {
226226
insertRowWithFallback(transformedRecord, kafkaOffset);
227-
this.processedOffset.set(kafkaOffset);
228-
LOGGER.trace("Setting processedOffset=[{}], channel=[{}]", kafkaOffset, channelName);
229227
}
228+
// Always update processedOffset after transformData, even for empty/broken records
229+
// Empty records are already reported to DLQ in transformData if needed
230+
this.processedOffset.set(kafkaOffset);
231+
LOGGER.trace("Setting processedOffset=[{}], channel=[{}]", kafkaOffset, channelName);
230232
} catch (TopicPartitionChannelInsertionException ex) {
231233
// Suppressing the exception because other channels might still continue to ingest
232234
LOGGER.warn(

src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

0 commit comments

Comments
 (0)