Skip to content

Commit 614b2cf

Browse files
Check the tests
1 parent 3beb03e commit 614b2cf

File tree

5 files changed

+294
-201
lines changed

5 files changed

+294
-201
lines changed

src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
2626
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
2727
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
28+
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
2829
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
2930
import com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2;
3031
import java.util.Arrays;
@@ -77,6 +78,9 @@ public class SnowflakeSinkTask extends SinkTask {
7778
private final SnowflakeSinkTaskAuthorizationExceptionTracker authorizationExceptionTracker =
7879
new SnowflakeSinkTaskAuthorizationExceptionTracker();
7980

81+
// Stores channel error exception detected in preCommit to fail on next put() call
82+
private volatile SnowflakeKafkaConnectorException channelErrorToFailOn = null;
83+
8084
/** default constructor, invoked by kafka connect framework */
8185
public SnowflakeSinkTask() {
8286
DYNAMIC_LOGGER = new KCLogger(this.getClass().getName());
@@ -266,6 +270,13 @@ public void close(final Collection<TopicPartition> partitions) {
266270
public void put(final Collection<SinkRecord> records) {
267271
this.authorizationExceptionTracker.throwExceptionIfAuthorizationFailed();
268272

273+
// Check for channel errors detected in preCommit and fail the task
274+
if (this.channelErrorToFailOn != null) {
275+
SnowflakeKafkaConnectorException error = this.channelErrorToFailOn;
276+
this.channelErrorToFailOn = null; // Clear so we don't throw again on restart
277+
throw new ConnectException(error.getMessage(), error);
278+
}
279+
269280
final long recordSize = records.size();
270281
DYNAMIC_LOGGER.debug("Calling PUT with {} records", recordSize);
271282

@@ -319,6 +330,13 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
319330
committedOffsets.put(topicPartition, new OffsetAndMetadata(offset));
320331
}
321332
});
333+
} catch (SnowflakeKafkaConnectorException e) {
334+
this.authorizationExceptionTracker.reportPrecommitException(e);
335+
this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage());
336+
// Channel error count exceeded - store to fail on next put() call
337+
if (e.checkErrorCode(SnowflakeErrors.ERROR_5030)) {
338+
this.channelErrorToFailOn = e;
339+
}
322340
} catch (Exception e) {
323341
this.authorizationExceptionTracker.reportPrecommitException(e);
324342
this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage());
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
package com.snowflake.kafka.connector.internal.streaming;
2+
3+
import static org.awaitility.Awaitility.await;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
6+
import com.fasterxml.jackson.core.JsonProcessingException;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.snowflake.ingest.streaming.ChannelStatus;
9+
import com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams;
10+
import com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector;
11+
import com.snowflake.kafka.connector.internal.TestUtils;
12+
import com.snowflake.kafka.connector.internal.streaming.v2.StreamingClientManager;
13+
import java.time.Duration;
14+
import java.time.Instant;
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
import org.apache.kafka.connect.json.JsonConverter;
18+
import org.apache.kafka.connect.runtime.ConnectorConfig;
19+
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
20+
import org.apache.kafka.connect.sink.SinkConnector;
21+
import org.apache.kafka.connect.storage.StringConverter;
22+
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
23+
import org.junit.jupiter.api.AfterAll;
24+
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.BeforeAll;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.api.TestInstance;
29+
30+
/**
31+
* Integration tests for channel status error handling using an embedded Kafka Connect cluster with
32+
* fake streaming ingest clients.
33+
*/
34+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
35+
class ChannelStatusCheckIT {
36+
37+
private EmbeddedConnectCluster connectCluster;
38+
private final FakeIngestClientSupplier fakeClientSupplier = new FakeIngestClientSupplier();
39+
40+
@BeforeAll
41+
void beforeAll() {
42+
Map<String, String> workerConfig = new HashMap<>();
43+
workerConfig.put("plugin.discovery", "hybrid_warn");
44+
// Set a short offset flush interval for faster preCommit calls
45+
workerConfig.put("offset.flush.interval.ms", "5000");
46+
connectCluster =
47+
new EmbeddedConnectCluster.Builder()
48+
.name("channel-status-check-cluster")
49+
.numWorkers(5)
50+
.workerProps(workerConfig)
51+
.build();
52+
connectCluster.start();
53+
}
54+
55+
@AfterAll
56+
void afterAll() {
57+
if (connectCluster != null) {
58+
connectCluster.stop();
59+
connectCluster = null;
60+
}
61+
}
62+
63+
private static final int PARTITIONS_NUMBER = 10;
64+
65+
private String topicName;
66+
private String connectorName;
67+
private final ObjectMapper mapper = new ObjectMapper();
68+
69+
@BeforeEach
70+
void setUp() {
71+
topicName = TestUtils.randomTableName();
72+
connectorName = topicName + "_connector";
73+
connectCluster.kafka().createTopic(topicName, PARTITIONS_NUMBER);
74+
TestUtils.getConnectionService().createTableWithMetadataColumn(topicName);
75+
StreamingClientManager.setIngestClientSupplier(fakeClientSupplier);
76+
}
77+
78+
@AfterEach
79+
void tearDown() {
80+
connectCluster.deleteConnector(connectorName);
81+
waitForConnectorStopped(connectorName);
82+
connectCluster.kafka().deleteTopic(topicName);
83+
StreamingClientManager.resetIngestClientSupplier();
84+
TestUtils.dropTable(topicName);
85+
}
86+
87+
@Test
88+
void shouldContinueWorkingWhenNoChannelErrors() throws JsonProcessingException {
89+
// Given: connector with default config (errors.tolerance=none)
90+
Map<String, String> config = defaultProperties(topicName, connectorName);
91+
connectCluster.configureConnector(connectorName, config);
92+
waitForConnectorRunning(connectorName);
93+
waitForOpenedFakeIngestClient(connectorName);
94+
95+
// When: produce messages
96+
produceMessages(3000);
97+
98+
// Then: connector should remain running (no errors to cause failure)
99+
await("Messages processed")
100+
.atMost(Duration.ofSeconds(30))
101+
.until(() -> waitForConnectorToOpenChannels(connectorName).getAppendedRowCount() >= 3);
102+
103+
ConnectorStateInfo connectorState = connectCluster.connectorStatus(connectorName);
104+
assertTrue(
105+
connectorState.tasks().stream()
106+
.allMatch(task -> "RUNNING".equals(task.state())),
107+
"All tasks should be running when there are no channel errors");
108+
}
109+
110+
@Test
111+
void shouldFailConnectorWhenChannelHasErrorsAndToleranceIsNone() throws JsonProcessingException {
112+
// Given: connector with errors.tolerance=none (default)
113+
Map<String, String> config = defaultProperties(topicName, connectorName);
114+
connectCluster.configureConnector(connectorName, config);
115+
waitForConnectorRunning(connectorName);
116+
117+
FakeSnowflakeStreamingIngestClient fakeClient = waitForConnectorToOpenChannels(connectorName);
118+
119+
// Produce initial message to ensure channel is set up
120+
produceMessages(3000);
121+
await("Initial message processed")
122+
.atMost(Duration.ofSeconds(30))
123+
.until(() -> fakeClient.getAppendedRowCount() >= 1);
124+
125+
// When: set channel status with errors on all channels
126+
for (FakeSnowflakeStreamingIngestChannel channel : fakeClient.getOpenedChannels()) {
127+
ChannelStatus statusWithErrors = createChannelStatusWithErrors(channel.getChannelName(), 5);
128+
channel.setChannelStatus(statusWithErrors);
129+
}
130+
131+
// Produce more messages to trigger preCommit which checks channel status
132+
produceMessages(5);
133+
134+
// Then: connector task should fail due to channel errors
135+
await("Connector task failed")
136+
.atMost(Duration.ofMinutes(2))
137+
.pollInterval(Duration.ofSeconds(4))
138+
.until(
139+
() -> {
140+
ConnectorStateInfo state = connectCluster.connectorStatus(connectorName);
141+
return state.tasks().stream().anyMatch(task -> "FAILED".equals(task.state()));
142+
});
143+
}
144+
145+
@Test
146+
void shouldContinueWorkingWhenChannelHasErrorsAndToleranceIsAll()
147+
throws JsonProcessingException {
148+
// Given: connector with errors.tolerance=all
149+
Map<String, String> config = defaultProperties(topicName, connectorName);
150+
config.put(KafkaConnectorConfigParams.ERRORS_TOLERANCE_CONFIG, "all");
151+
connectCluster.configureConnector(connectorName, config);
152+
waitForConnectorRunning(connectorName);
153+
154+
FakeSnowflakeStreamingIngestClient fakeClient = waitForConnectorToOpenChannels(connectorName);
155+
156+
// Produce initial message
157+
produceMessages(1);
158+
await("Initial message processed")
159+
.atMost(Duration.ofSeconds(30))
160+
.until(() -> fakeClient.getAppendedRowCount() >= 1);
161+
162+
// When: set channel status with errors on all channels
163+
for (FakeSnowflakeStreamingIngestChannel channel : fakeClient.getOpenedChannels()) {
164+
ChannelStatus statusWithErrors = createChannelStatusWithErrors(channel.getChannelName(), 5);
165+
channel.setChannelStatus(statusWithErrors);
166+
}
167+
168+
// Produce more messages
169+
produceMessages(2);
170+
171+
// Then: connector should continue running (errors are tolerated)
172+
await("Messages processed despite errors")
173+
.atMost(Duration.ofSeconds(30))
174+
.until(() -> fakeClient.getAppendedRowCount() >= 3);
175+
176+
ConnectorStateInfo connectorState = connectCluster.connectorStatus(connectorName);
177+
assertTrue(
178+
connectorState.tasks().stream()
179+
.allMatch(task -> "RUNNING".equals(task.state())),
180+
"All tasks should remain running when errors.tolerance=all");
181+
}
182+
183+
private void produceMessages(int count) throws JsonProcessingException {
184+
Map<String, String> payload = Map.of("key1", "value1", "key2", "value2");
185+
for (int i = 0; i < count; i++) {
186+
connectCluster
187+
.kafka()
188+
.produce(topicName, i % PARTITIONS_NUMBER, "key-" + i, mapper.writeValueAsString(payload));
189+
}
190+
}
191+
192+
private ChannelStatus createChannelStatusWithErrors(String channelName, long errorCount) {
193+
return new ChannelStatus(
194+
"db",
195+
"schema",
196+
topicName, // pipeName
197+
channelName,
198+
"SUCCESS",
199+
"100", // latestCommittedOffsetToken
200+
Instant.now(), // createdOn
201+
100, // rowsInsertedCount
202+
105, // rowsParsedCount
203+
errorCount, // rowsErrorCount
204+
"95", // lastErrorOffsetTokenUpperBound
205+
"Test error message", // lastErrorMessage
206+
Instant.now(), // lastErrorTimestamp
207+
Duration.ofMillis(50), // serverAvgProcessingLatency
208+
Instant.now()); // lastRefreshedOn
209+
}
210+
211+
// Helper methods
212+
213+
private FakeSnowflakeStreamingIngestClient waitForConnectorToOpenChannels(String connectorName) {
214+
await("channelsCreated")
215+
.atMost(Duration.ofSeconds(30))
216+
.ignoreExceptions()
217+
.until(
218+
() ->
219+
!getFakeSnowflakeStreamingIngestClient(connectorName)
220+
.getOpenedChannels()
221+
.isEmpty());
222+
return getFakeSnowflakeStreamingIngestClient(connectorName);
223+
}
224+
225+
private void waitForOpenedFakeIngestClient(String connectorName) {
226+
waitForConnectorToOpenChannels(connectorName);
227+
}
228+
229+
private FakeSnowflakeStreamingIngestClient getFakeSnowflakeStreamingIngestClient(
230+
String connectorName) {
231+
return fakeClientSupplier.getFakeIngestClients().stream()
232+
.filter((client) -> client.getConnectorName().equals(connectorName))
233+
.findFirst()
234+
.orElseThrow();
235+
}
236+
237+
private Map<String, String> defaultProperties(String topicName, String connectorName) {
238+
Map<String, String> config = TestUtils.transformProfileFileToConnectorConfiguration(false);
239+
config.put(SinkConnector.TOPICS_CONFIG, topicName);
240+
config.put(
241+
ConnectorConfig.CONNECTOR_CLASS_CONFIG, SnowflakeStreamingSinkConnector.class.getName());
242+
config.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
243+
config.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
244+
config.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
245+
config.put(KafkaConnectorConfigParams.NAME, connectorName);
246+
config.put(KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_MAX_CLIENT_LAG, "1");
247+
config.put(KafkaConnectorConfigParams.VALUE_CONVERTER_SCHEMAS_ENABLE, "false");
248+
return config;
249+
}
250+
251+
private void waitForConnectorRunning(String connectorName) {
252+
try {
253+
connectCluster
254+
.assertions()
255+
.assertConnectorAndAtLeastNumTasksAreRunning(
256+
connectorName, 1, "The connector did not start.");
257+
} catch (InterruptedException e) {
258+
throw new IllegalStateException("The connector is not running");
259+
}
260+
}
261+
262+
private void waitForConnectorStopped(String connectorName) {
263+
try {
264+
connectCluster
265+
.assertions()
266+
.assertConnectorDoesNotExist(connectorName, "Failed to stop the connector");
267+
} catch (InterruptedException e) {
268+
throw new IllegalStateException("Interrupted while waiting for connector to stop");
269+
}
270+
}
271+
}
272+

src/test/java/com/snowflake/kafka/connector/internal/streaming/FakeSnowflakeStreamingIngestClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,17 @@ public OpenChannelResult openChannel(final String channelName, final String offs
6161
"SUCCESS",
6262
offsetToken,
6363
Instant.now(),
64-
1,
65-
1,
66-
1,
64+
0, // rowsInsertedCount
65+
0, // rowsParsedCount
66+
0, // rowsErrorCount - default to 0 (no errors)
6767
null,
6868
null,
6969
null,
7070
null,
7171
Instant.now());
7272
final FakeSnowflakeStreamingIngestChannel channel =
7373
new FakeSnowflakeStreamingIngestChannel(pipeName, channelName);
74+
channel.setChannelStatus(channelStatus); // Set default channel status
7475
openedChannels.add(channel);
7576
return new OpenChannelResult(channel, channelStatus);
7677
}

src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -856,42 +856,4 @@ public void testStreamingIngestionWithExactlyOnceSemanticsOverlappingOffsets() t
856856

857857
service2.closeAll();
858858
}
859-
860-
/**
861-
* Test that getOffset works correctly when there are no channel errors. This test verifies that
862-
* the channel status check in getOffset does not throw an exception when no errors are present.
863-
*/
864-
@Test
865-
public void testGetOffsetWithNoChannelErrors() throws Exception {
866-
SnowflakeSinkService service =
867-
StreamingSinkServiceBuilder.builder(conn, config)
868-
.withSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
869-
.build();
870-
service.startPartition(topicPartition);
871-
872-
Converter converter = buildJsonConverter();
873-
SchemaAndValue input =
874-
converter.toConnectData(topic, "{\"name\":\"test\"}".getBytes(StandardCharsets.UTF_8));
875-
876-
List<SinkRecord> records = new ArrayList<>();
877-
for (int i = 0; i < 5; i++) {
878-
records.add(
879-
new SinkRecord(
880-
topic, partition, Schema.STRING_SCHEMA, "key" + i, input.schema(), input.value(), i));
881-
}
882-
883-
service.insert(records);
884-
885-
// getOffset should work without throwing exceptions when there are no channel errors
886-
// The channel status check happens inside getOffset
887-
TestUtils.assertWithRetry(
888-
() -> {
889-
long offset = service.getOffset(topicPartition);
890-
return offset == 5;
891-
},
892-
5,
893-
20);
894-
895-
service.closeAll();
896-
}
897859
}

0 commit comments

Comments
 (0)