Skip to content

Commit 7e1b773

Browse files
FLOW-7395 Use channel status error count
1 parent be5834f commit 7e1b773

File tree

8 files changed

+374
-9
lines changed

8 files changed

+374
-9
lines changed

src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
2626
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
2727
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
28+
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
2829
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
2930
import com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2;
3031
import java.util.Arrays;
@@ -77,6 +78,9 @@ public class SnowflakeSinkTask extends SinkTask {
7778
private final SnowflakeSinkTaskAuthorizationExceptionTracker authorizationExceptionTracker =
7879
new SnowflakeSinkTaskAuthorizationExceptionTracker();
7980

81+
// Stores channel error exception detected in preCommit to fail on next put() call
82+
private volatile SnowflakeKafkaConnectorException channelErrorToFailOn = null;
83+
8084
/** default constructor, invoked by kafka connect framework */
8185
public SnowflakeSinkTask() {
8286
DYNAMIC_LOGGER = new KCLogger(this.getClass().getName());
@@ -266,6 +270,13 @@ public void close(final Collection<TopicPartition> partitions) {
266270
public void put(final Collection<SinkRecord> records) {
267271
this.authorizationExceptionTracker.throwExceptionIfAuthorizationFailed();
268272

273+
// Check for channel errors detected in preCommit and fail the task
274+
if (this.channelErrorToFailOn != null) {
275+
SnowflakeKafkaConnectorException error = this.channelErrorToFailOn;
276+
this.channelErrorToFailOn = null; // Clear so we don't throw again on restart
277+
throw new ConnectException(error.getMessage(), error);
278+
}
279+
269280
final long recordSize = records.size();
270281
DYNAMIC_LOGGER.debug("Calling PUT with {} records", recordSize);
271282

@@ -319,6 +330,13 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
319330
committedOffsets.put(topicPartition, new OffsetAndMetadata(offset));
320331
}
321332
});
333+
} catch (SnowflakeKafkaConnectorException e) {
334+
this.authorizationExceptionTracker.reportPrecommitException(e);
335+
this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage());
336+
// Channel error count exceeded - store to fail on next put() call
337+
if (e.checkErrorCode(SnowflakeErrors.ERROR_5030)) {
338+
this.channelErrorToFailOn = e;
339+
}
322340
} catch (Exception e) {
323341
this.authorizationExceptionTracker.reportPrecommitException(e);
324342
this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage());

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,11 @@ public enum SnowflakeErrors {
233233
"5029",
234234
"Destination table does not exist",
235235
"Destination table does not exist. Please ensure the destination table exists in Snowflake"
236-
+ " before starting the connector.");
236+
+ " before starting the connector."),
237+
ERROR_5030(
238+
"5030",
239+
"Channel error count threshold exceeded",
240+
"Channel has reported errors during data ingestion. Check the channel history for details.");
237241

238242
// properties
239243

src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
8181
// default is true unless the configuration provided is false;
8282
// If this is true, we will enable Mbean for required classes and emit JMX metrics for monitoring
8383
private boolean enableCustomJMXMonitoring = KafkaConnectorConfigParams.JMX_OPT_DEFAULT;
84+
// Whether to tolerate errors during ingestion (based on errors.tolerance config)
85+
private final boolean tolerateErrors;
8486

8587
public SnowflakeSinkServiceV2(
8688
SnowflakeConnectionService conn,
@@ -118,11 +120,13 @@ public SnowflakeSinkServiceV2(
118120
}
119121

120122
this.metricsJmxReporter = new MetricsJmxReporter(new MetricRegistry(), this.connectorName);
123+
this.tolerateErrors = StreamingUtils.tolerateErrors(connectorConfig);
121124

122125
LOGGER.info(
123-
"SnowflakeSinkServiceV2 initialized for connector: {}, task: {}",
126+
"SnowflakeSinkServiceV2 initialized for connector: {}, task: {}, tolerateErrors: {}",
124127
this.connectorName,
125-
this.taskId);
128+
this.taskId,
129+
this.tolerateErrors);
126130
}
127131

128132
/** Gets a unique identifier consisting of connector name, topic name and partition number. */
@@ -322,8 +326,10 @@ public long getOffset(TopicPartition topicPartition) {
322326
String partitionChannelKey =
323327
makeChannelName(this.connectorName, topicPartition.topic(), topicPartition.partition());
324328
if (partitionsToChannel.containsKey(partitionChannelKey)) {
325-
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
326-
partitionsToChannel.get(partitionChannelKey).setLatestConsumerGroupOffset(offset);
329+
TopicPartitionChannel channel = partitionsToChannel.get(partitionChannelKey);
330+
channel.checkChannelStatusAndLogErrors(tolerateErrors);
331+
long offset = channel.getOffsetSafeToCommitToKafka();
332+
channel.setLatestConsumerGroupOffset(offset);
327333
LOGGER.info(
328334
"Fetched snowflake commited offset: [{}] for channel [{}]", offset, partitionChannelKey);
329335
return offset;

src/main/java/com/snowflake/kafka/connector/internal/streaming/channel/TopicPartitionChannel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ default long getOffsetSafeToCommitToKafka() {
6868

6969
void setLatestConsumerGroupOffset(long consumerOffset);
7070

71+
void checkChannelStatusAndLogErrors(boolean tolerateErrors);
72+
7173
default CompletableFuture<Void> waitForLastProcessedRecordCommitted() {
7274
return CompletableFuture.completedFuture(null);
7375
}

src/main/java/com/snowflake/kafka/connector/internal/streaming/v2/SnowpipeStreamingPartitionChannel.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5027;
44
import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5028;
5+
import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5030;
56

67
import com.google.common.annotations.VisibleForTesting;
8+
import com.snowflake.ingest.streaming.ChannelStatus;
79
import com.snowflake.ingest.streaming.OpenChannelResult;
810
import com.snowflake.ingest.streaming.SFException;
911
import com.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
@@ -583,6 +585,61 @@ public void setLatestConsumerGroupOffset(long consumerOffset) {
583585
}
584586
}
585587

588+
@Override
589+
public void checkChannelStatusAndLogErrors(final boolean tolerateErrors) {
590+
final ChannelStatus status = channel.getChannelStatus();
591+
logChannelStatus(status);
592+
handleChannelErrors(status, tolerateErrors);
593+
}
594+
595+
private void logChannelStatus(final ChannelStatus status) {
596+
LOGGER.info(
597+
"Channel status for channel=[{}]: databaseName=[{}], schemaName=[{}], pipeName=[{}],"
598+
+ " channelName=[{}], statusCode=[{}], latestCommittedOffsetToken=[{}],"
599+
+ " createdOn=[{}], rowsInsertedCount=[{}], rowsParsedCount=[{}],"
600+
+ " rowsErrorCount=[{}], lastErrorOffsetTokenUpperBound=[{}],"
601+
+ " lastErrorMessage=[{}], lastErrorTimestamp=[{}],"
602+
+ " serverAvgProcessingLatency=[{}], lastRefreshedOn=[{}]",
603+
this.getChannelNameFormatV1(),
604+
status.getDatabaseName(),
605+
status.getSchemaName(),
606+
status.getPipeName(),
607+
status.getChannelName(),
608+
status.getStatusCode(),
609+
status.getLatestCommittedOffsetToken(),
610+
status.getCreatedOn(),
611+
status.getRowsInsertedCount(),
612+
status.getRowsParsedCount(),
613+
status.getRowsErrorCount(),
614+
status.getLastErrorOffsetTokenUpperBound(),
615+
status.getLastErrorMessage(),
616+
status.getLastErrorTimestamp(),
617+
status.getServerAvgProcessingLatency(),
618+
status.getLastRefreshedOn());
619+
}
620+
621+
private void handleChannelErrors(final ChannelStatus status, final boolean tolerateErrors) {
622+
final long rowsErrorCount = status.getRowsErrorCount();
623+
if (rowsErrorCount > 0) {
624+
final String errorMessage =
625+
String.format(
626+
"Channel [%s] has %d errors. Last error message: %s, last error timestamp: %s,"
627+
+ " last error offset token upper bound: %s",
628+
this.getChannelNameFormatV1(),
629+
rowsErrorCount,
630+
status.getLastErrorMessage(),
631+
status.getLastErrorTimestamp(),
632+
status.getLastErrorOffsetTokenUpperBound());
633+
634+
if (tolerateErrors) {
635+
LOGGER.warn(errorMessage);
636+
} else {
637+
this.telemetryServiceV2.reportKafkaConnectFatalError(errorMessage);
638+
throw ERROR_5030.getException(errorMessage);
639+
}
640+
}
641+
}
642+
586643
/**
587644
* Enum representing which Streaming API is invoking the fallback supplier. ({@link
588645
* #streamingApiFallbackSupplier(StreamingApiFallbackInvoker)})

0 commit comments

Comments
 (0)