Skip to content

Commit 3a25fc6

Browse files
FLOW-7395 Use channel status error count
1 parent 128048b commit 3a25fc6

File tree

8 files changed

+374
-9
lines changed

8 files changed

+374
-9
lines changed

src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
2626
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
2727
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
28+
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
2829
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
2930
import com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2;
3031
import java.util.Arrays;
@@ -77,6 +78,9 @@ public class SnowflakeSinkTask extends SinkTask {
7778
private final SnowflakeSinkTaskAuthorizationExceptionTracker authorizationExceptionTracker =
7879
new SnowflakeSinkTaskAuthorizationExceptionTracker();
7980

81+
// Stores channel error exception detected in preCommit to fail on next put() call
82+
private volatile SnowflakeKafkaConnectorException channelErrorToFailOn = null;
83+
8084
/** default constructor, invoked by kafka connect framework */
8185
public SnowflakeSinkTask() {
8286
DYNAMIC_LOGGER = new KCLogger(this.getClass().getName());
@@ -266,6 +270,13 @@ public void close(final Collection<TopicPartition> partitions) {
266270
public void put(final Collection<SinkRecord> records) {
267271
this.authorizationExceptionTracker.throwExceptionIfAuthorizationFailed();
268272

273+
// Check for channel errors detected in preCommit and fail the task
274+
if (this.channelErrorToFailOn != null) {
275+
SnowflakeKafkaConnectorException error = this.channelErrorToFailOn;
276+
this.channelErrorToFailOn = null; // Clear so we don't throw again on restart
277+
throw new ConnectException(error.getMessage(), error);
278+
}
279+
269280
final long recordSize = records.size();
270281
DYNAMIC_LOGGER.debug("Calling PUT with {} records", recordSize);
271282

@@ -319,6 +330,13 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
319330
committedOffsets.put(topicPartition, new OffsetAndMetadata(offset));
320331
}
321332
});
333+
} catch (SnowflakeKafkaConnectorException e) {
334+
this.authorizationExceptionTracker.reportPrecommitException(e);
335+
this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage());
336+
// Channel error count exceeded - store to fail on next put() call
337+
if (e.checkErrorCode(SnowflakeErrors.ERROR_5030)) {
338+
this.channelErrorToFailOn = e;
339+
}
322340
} catch (Exception e) {
323341
this.authorizationExceptionTracker.reportPrecommitException(e);
324342
this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage());

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,11 @@ public enum SnowflakeErrors {
233233
"5029",
234234
"Destination table does not exist",
235235
"Destination table does not exist. Please ensure the destination table exists in Snowflake"
236-
+ " before starting the connector.");
236+
+ " before starting the connector."),
237+
ERROR_5030(
238+
"5030",
239+
"Channel error count threshold exceeded",
240+
"Channel has reported errors during data ingestion. Check the channel history for details.");
237241

238242
// properties
239243

src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
8181
// default is true unless the configuration provided is false;
8282
// If this is true, we will enable Mbean for required classes and emit JMX metrics for monitoring
8383
private boolean enableCustomJMXMonitoring = KafkaConnectorConfigParams.JMX_OPT_DEFAULT;
84+
// Whether to tolerate errors during ingestion (based on errors.tolerance config)
85+
private final boolean tolerateErrors;
8486

8587
public SnowflakeSinkServiceV2(
8688
SnowflakeConnectionService conn,
@@ -118,11 +120,13 @@ public SnowflakeSinkServiceV2(
118120
}
119121

120122
this.metricsJmxReporter = new MetricsJmxReporter(new MetricRegistry(), this.connectorName);
123+
this.tolerateErrors = StreamingUtils.tolerateErrors(connectorConfig);
121124

122125
LOGGER.info(
123-
"SnowflakeSinkServiceV2 initialized for connector: {}, task: {}",
126+
"SnowflakeSinkServiceV2 initialized for connector: {}, task: {}, tolerateErrors: {}",
124127
this.connectorName,
125-
this.taskId);
128+
this.taskId,
129+
this.tolerateErrors);
126130
}
127131

128132
/** Gets a unique identifier consisting of connector name, topic name and partition number. */
@@ -314,8 +318,10 @@ public long getOffset(TopicPartition topicPartition) {
314318
String partitionChannelKey =
315319
makeChannelName(this.connectorName, topicPartition.topic(), topicPartition.partition());
316320
if (partitionsToChannel.containsKey(partitionChannelKey)) {
317-
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
318-
partitionsToChannel.get(partitionChannelKey).setLatestConsumerGroupOffset(offset);
321+
TopicPartitionChannel channel = partitionsToChannel.get(partitionChannelKey);
322+
channel.checkChannelStatusAndLogErrors(tolerateErrors);
323+
long offset = channel.getOffsetSafeToCommitToKafka();
324+
channel.setLatestConsumerGroupOffset(offset);
319325
LOGGER.info(
320326
"Fetched snowflake commited offset: [{}] for channel [{}]", offset, partitionChannelKey);
321327
return offset;

src/main/java/com/snowflake/kafka/connector/internal/streaming/channel/TopicPartitionChannel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ default long getOffsetSafeToCommitToKafka() {
6868

6969
void setLatestConsumerGroupOffset(long consumerOffset);
7070

71+
void checkChannelStatusAndLogErrors(boolean tolerateErrors);
72+
7173
default CompletableFuture<Void> waitForLastProcessedRecordCommitted() {
7274
return CompletableFuture.completedFuture(null);
7375
}

src/main/java/com/snowflake/kafka/connector/internal/streaming/v2/SnowpipeStreamingPartitionChannel.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5027;
44
import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5028;
5+
import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5030;
56

67
import com.google.common.annotations.VisibleForTesting;
8+
import com.snowflake.ingest.streaming.ChannelStatus;
79
import com.snowflake.ingest.streaming.OpenChannelResult;
810
import com.snowflake.ingest.streaming.SFException;
911
import com.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
@@ -597,6 +599,61 @@ public void setLatestConsumerGroupOffset(long consumerOffset) {
597599
}
598600
}
599601

602+
@Override
603+
public void checkChannelStatusAndLogErrors(final boolean tolerateErrors) {
604+
final ChannelStatus status = channel.getChannelStatus();
605+
logChannelStatus(status);
606+
handleChannelErrors(status, tolerateErrors);
607+
}
608+
609+
private void logChannelStatus(final ChannelStatus status) {
610+
LOGGER.info(
611+
"Channel status for channel=[{}]: databaseName=[{}], schemaName=[{}], pipeName=[{}],"
612+
+ " channelName=[{}], statusCode=[{}], latestCommittedOffsetToken=[{}],"
613+
+ " createdOn=[{}], rowsInsertedCount=[{}], rowsParsedCount=[{}],"
614+
+ " rowsErrorCount=[{}], lastErrorOffsetTokenUpperBound=[{}],"
615+
+ " lastErrorMessage=[{}], lastErrorTimestamp=[{}],"
616+
+ " serverAvgProcessingLatency=[{}], lastRefreshedOn=[{}]",
617+
this.getChannelNameFormatV1(),
618+
status.getDatabaseName(),
619+
status.getSchemaName(),
620+
status.getPipeName(),
621+
status.getChannelName(),
622+
status.getStatusCode(),
623+
status.getLatestCommittedOffsetToken(),
624+
status.getCreatedOn(),
625+
status.getRowsInsertedCount(),
626+
status.getRowsParsedCount(),
627+
status.getRowsErrorCount(),
628+
status.getLastErrorOffsetTokenUpperBound(),
629+
status.getLastErrorMessage(),
630+
status.getLastErrorTimestamp(),
631+
status.getServerAvgProcessingLatency(),
632+
status.getLastRefreshedOn());
633+
}
634+
635+
private void handleChannelErrors(final ChannelStatus status, final boolean tolerateErrors) {
636+
final long rowsErrorCount = status.getRowsErrorCount();
637+
if (rowsErrorCount > 0) {
638+
final String errorMessage =
639+
String.format(
640+
"Channel [%s] has %d errors. Last error message: %s, last error timestamp: %s,"
641+
+ " last error offset token upper bound: %s",
642+
this.getChannelNameFormatV1(),
643+
rowsErrorCount,
644+
status.getLastErrorMessage(),
645+
status.getLastErrorTimestamp(),
646+
status.getLastErrorOffsetTokenUpperBound());
647+
648+
if (tolerateErrors) {
649+
LOGGER.warn(errorMessage);
650+
} else {
651+
this.telemetryServiceV2.reportKafkaConnectFatalError(errorMessage);
652+
throw ERROR_5030.getException(errorMessage);
653+
}
654+
}
655+
}
656+
600657
/**
601658
* Enum representing which Streaming API is invoking the fallback supplier. ({@link
602659
* #streamingApiFallbackSupplier(StreamingApiFallbackInvoker)})

0 commit comments

Comments
 (0)