Skip to content

Commit 01bc9c4

Browse files
FLOW-7155 Adapt iceberg tests for ssv2
1 parent b8e5ba8 commit 01bc9c4

19 files changed

+496
-357
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@
594594
<dependency>
595595
<groupId>org.testcontainers</groupId>
596596
<artifactId>testcontainers</artifactId>
597-
<version>1.19.3</version>
597+
<version>2.0.2</version>
598598
<scope>test</scope>
599599
</dependency>
600600

src/main/java/com/snowflake/kafka/connector/SemanticVersion.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
import java.util.regex.Pattern;
66

77
/** Represents a parsed semantic version. */
8-
class SemanticVersion implements Comparable<SemanticVersion> {
9-
final int major;
10-
final int minor;
11-
final int patch;
12-
final boolean isReleaseCandidate;
13-
final String originalVersion;
14-
15-
SemanticVersion(String version) {
8+
public class SemanticVersion implements Comparable<SemanticVersion> {
9+
private final int major;
10+
private final int minor;
11+
private final int patch;
12+
private final boolean isReleaseCandidate;
13+
private final String originalVersion;
14+
15+
public SemanticVersion(String version) {
1616
this.originalVersion = version;
1717
// Pattern to match versions like "3.1.0" or "4.0.0-rc" or "4.0.0-RC1"
1818
Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)(?:-[rR][cC]\\d*)?");
@@ -26,6 +26,26 @@ class SemanticVersion implements Comparable<SemanticVersion> {
2626
this.isReleaseCandidate = version.toLowerCase().contains("-rc");
2727
}
2828

29+
public String originalVersion() {
30+
return originalVersion;
31+
}
32+
33+
public boolean isReleaseCandidate() {
34+
return isReleaseCandidate;
35+
}
36+
37+
public int major() {
38+
return major;
39+
}
40+
41+
public int minor() {
42+
return minor;
43+
}
44+
45+
public int patch() {
46+
return patch;
47+
}
48+
2949
@Override
3050
public int compareTo(SemanticVersion other) {
3151
if (this.major != other.major) {

src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ public String version() {
346346
* @param config connector config file
347347
* @return result map
348348
*/
349-
static Map<String, String> getTopicToTableMap(Map<String, String> config) {
349+
public static Map<String, String> getTopicToTableMap(Map<String, String> config) {
350350
if (config.containsKey(KafkaConnectorConfigParams.SNOWFLAKE_TOPICS2TABLE_MAP)) {
351351
Map<String, String> result =
352352
Utils.parseTopicToTableMap(

src/main/java/com/snowflake/kafka/connector/Utils.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public static String quoteNameIfNeeded(String name) {
9494
* <li>Never suggest RC (release candidate) versions
9595
* </ul>
9696
*/
97-
static boolean checkConnectorVersion() {
97+
public static boolean checkConnectorVersion() {
9898
return checkConnectorVersion(VERSION, fetchAvailableVersionsFromMaven());
9999
}
100100

@@ -172,7 +172,7 @@ static String findRecommendedVersion(
172172
SemanticVersion version = new SemanticVersion(versionString);
173173

174174
// Skip RC versions
175-
if (version.isReleaseCandidate) {
175+
if (version.isReleaseCandidate()) {
176176
continue;
177177
}
178178

@@ -198,7 +198,7 @@ static String findRecommendedVersion(
198198
*
199199
* @param config connector configuration
200200
*/
201-
static ImmutableMap<String, String> validateProxySettings(Map<String, String> config) {
201+
public static ImmutableMap<String, String> validateProxySettings(Map<String, String> config) {
202202
Map<String, String> invalidConfigParams = new HashMap<String, String>();
203203

204204
String host =
@@ -238,7 +238,7 @@ static ImmutableMap<String, String> validateProxySettings(Map<String, String> co
238238
*
239239
* @param config connector configuration
240240
*/
241-
static void enableJVMProxy(Map<String, String> config) {
241+
public static void enableJVMProxy(Map<String, String> config) {
242242
String host =
243243
ConnectorConfigTools.getProperty(config, KafkaConnectorConfigParams.JVM_PROXY_HOST);
244244
String port =
@@ -307,7 +307,7 @@ static boolean isValidSnowflakeObjectIdentifier(String objName) {
307307
* @param appName snowflake application name
308308
* @return true if given application name is valid
309309
*/
310-
static boolean isValidSnowflakeApplicationName(String appName) {
310+
public static boolean isValidSnowflakeApplicationName(String appName) {
311311
return appName.matches("^[-_a-zA-Z]{1}[-_$a-zA-Z0-9]+$");
312312
}
313313

src/main/java/com/snowflake/kafka/connector/internal/DescribeTableRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,6 @@ public int hashCode() {
3030

3131
@Override
3232
public String toString() {
33-
return "DescribeTableRow{" + "column='" + column + '\'' + ", type='" + type + '\'' + '}';
33+
return " " + column + " " + type;
3434
}
3535
}

src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_PRIVATE_KEY;
1818
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_ROLE_NAME;
1919
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_SCHEMA_NAME;
20-
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED;
2120
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_MAX_CLIENT_LAG;
2221
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_TOPICS2TABLE_MAP;
2322
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_URL_NAME;
@@ -62,31 +61,13 @@ public static Stream<Arguments> validConfigs() {
6261
Arguments.of(SnowflakeSinkConnectorConfigBuilder.streamingConfig().build()));
6362
}
6463

65-
static Stream<Arguments> invalidConfigs() {
66-
return Stream.of(
67-
Arguments.of(
68-
SnowflakeSinkConnectorConfigBuilder.icebergConfig().withIcebergEnabled().build(),
69-
"snowflake.streaming.iceberg.enabled"),
70-
Arguments.of(
71-
SnowflakeSinkConnectorConfigBuilder.icebergConfig().build(),
72-
"snowflake.streaming.iceberg.enabled"));
73-
}
74-
7564
@ParameterizedTest(name = "Valid config: {0}")
7665
@MethodSource("validConfigs")
7766
public void shouldValidateCorrectConfig(Map<String, String> config) {
7867
// no exception thrown
7968
connectorConfigValidator.validateConfig(config);
8069
}
8170

82-
@ParameterizedTest(name = "Invalid config: {0}")
83-
@MethodSource("invalidConfigs")
84-
void shouldReturnErrorOnInvalidConfig(final Map<String, String> config, String errorKey) {
85-
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
86-
.isInstanceOf(SnowflakeKafkaConnectorException.class)
87-
.hasMessageContaining(errorKey);
88-
}
89-
9071
@Test
9172
public void testConfig() {
9273
Map<String, String> config = SnowflakeSinkConnectorConfigBuilder.streamingConfig().build();
@@ -480,25 +461,6 @@ public void shouldThrowExceptionWhenRoleNotDefinedForSSv2() {
480461
.hasMessageContaining(SNOWFLAKE_ROLE_NAME);
481462
}
482463

483-
@Test
484-
public void shouldThrowExceptionWhenBothSSv2AndIcebergEnabled() {
485-
Map<String, String> config =
486-
SnowflakeSinkConnectorConfigBuilder.streamingConfig().withIcebergEnabled().build();
487-
488-
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
489-
.isInstanceOf(SnowflakeKafkaConnectorException.class)
490-
.hasMessageContaining("Ingestion to Iceberg table is currently unsupported")
491-
.hasMessageContaining(SNOWFLAKE_STREAMING_ICEBERG_ENABLED);
492-
}
493-
494-
@Test
495-
public void shouldValidateSSv2WithoutIceberg() {
496-
Map<String, String> config = SnowflakeSinkConnectorConfigBuilder.streamingConfig().build();
497-
498-
assertThatCode(() -> connectorConfigValidator.validateConfig(config))
499-
.doesNotThrowAnyException();
500-
}
501-
502464
private void invalidConfigRunner(List<String> paramsToRemove) {
503465
Map<String, String> config = getConfig();
504466
for (String configParam : paramsToRemove) {

0 commit comments

Comments
 (0)