From 57503c96fe714223a6776e391ec6b8efa143301d Mon Sep 17 00:00:00 2001 From: Brandt Newton Date: Fri, 22 May 2026 15:47:53 -0400 Subject: [PATCH] feat: Adds Extract Timestamp SMT to Kafka Connect Sink --- kafka-connect-bigtable-sink/README.md | 58 +++- kafka-connect-bigtable-sink/doc/tests.md | 2 +- .../integration/ExtractTimestampIT.java | 211 +++++++++++++ .../bigtable/integration/InsertModeIT.java | 34 +- .../connect/bigtable/util/TestDataUtil.java | 15 + .../test/resources/json/applied-schema.json | 5 + .../test/resources/json/expanded-order.json | 1 + .../test/resources/json/order-no-schema.json | 3 +- .../json/order-with-null-product-schema.json | 6 +- .../resources/json/order-with-schema.json | 6 +- .../bigtable/config/BigtableSinkConfig.java | 12 +- .../connect/bigtable/mapping/KeyMapper.java | 92 ++---- .../transformations/ExtractTimestamp.java | 151 +++++++++ .../transformations/TimestampPrecision.java | 23 ++ .../connect/bigtable/util/ConfigUtils.java | 26 ++ .../bigtable/util/SchemaParsingUtils.java | 81 +++++ .../transformations/ExtractTimestampTest.java | 292 ++++++++++++++++++ .../bigtable/util/SchemaParsingUtilsTest.java | 150 +++++++++ 18 files changed, 1057 insertions(+), 111 deletions(-) create mode 100644 kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/integration/ExtractTimestampIT.java create mode 100644 kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java create mode 100644 kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/TimestampPrecision.java create mode 100644 kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/ConfigUtils.java create mode 100644 kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtils.java create mode 100644 kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestampTest.java create mode 100644 kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtilsTest.java diff --git a/kafka-connect-bigtable-sink/README.md b/kafka-connect-bigtable-sink/README.md index 6f2ff3b6..4aebb6bc 100644 --- a/kafka-connect-bigtable-sink/README.md +++ b/kafka-connect-bigtable-sink/README.md @@ -35,7 +35,8 @@ type: #### Using Message Values for Row Keys -If you need to use fields from the message value, rather than the message key, use the `org.apache.kafka.connect.transforms.ValueToKey` SMT to map the value onto the key: +If you need to use fields from the message value, rather than the message key, use +the `org.apache.kafka.connect.transforms.ValueToKey` SMT to map the value onto the key: ```properties transforms=createKey @@ -66,11 +67,17 @@ optimizing configs for latency will reduce throughput and efficiency. When `value.null.mode` is set to `delete`, Kafka messages with a null value will result in the corresponding row being deleted. +### Cell Timestamps + +The Bigtable cell timestamps for your data are set to the timestamp of the Kafka message. If the message timestamp is +null, the Sink system time is used. If you need a message value to be used for the cell timestamp, try using the Extract +Timestamp SMT that's included in this package. + ## SMT This project includes SMTs that may be useful for preparing your data for Bigtable. -### Flatten Array Element +### Flatten Array Element SMT This SMT is used to flatten nested array fields that can be generated by some serialization libraries. @@ -105,7 +112,8 @@ The name of the field wrapping individual elements within the array. #### Example -Given the following input message value, with `array.field="products"` `array.inner.wrapper="list"` `array.element.wrapper="element"` +Given the following input message value, +with `array.field="products"` `array.inner.wrapper="list"` `array.element.wrapper="element"` ```json { @@ -137,6 +145,37 @@ The resulting output would be: } ``` +### Extract Timestamp SMT + +Extracts a timestamp from the message, to be used as the message timestamp. This can be useful because this Sink +Connector uses the Kafka message timestamp for the Bigtable cell timestamp. + +#### Configuration + +`timestamp.field` + +The name of the timestamp field. Non-root fields can be referenced by specifying the field path, with periods +separating each field. If the field cannot be found, or if the value is null, the message is failed. The field may be +a numeric, string or date type. + +`timestamp.field.precision` + +The precision of the timestamp field. Defaults to MILLIS. This only affects the output for numeric fields. Ignore this +config if your field is a date type. Supported values are NANOS, MICROS, MILLIS and SECONDS. Use the value that matches +the field's precision. Example: if your field has epoch millisecond values, use the MILLIS config value. + +##### Example + +This configuration extracts the message timestamp from a createdAt (int64) epoch millisecond field inside the "order" +struct. + +```properties +transforms=extractTimestamp +transforms.extractTimestamp.type=org.apache.kafka.connect.transformations.ExtractTimestamp +transforms.extractTimestamp.field=order.createdAt +transforms.extractTimestamp.precision=MILLI" +``` + ## Configuration See [config/](./config/bigtable-kafka-sink-connector.properties) for a sample @@ -204,7 +243,7 @@ Defines the insertion mode to use. Supported modes are: the table, an error is thrown. - upsert - If the row to be written already exists, then its column values are overwritten with the ones provided. -- replace_if_newest - If there are no cells newer than this record within the +- replace_if_newest - If there are no cells newer than this record within the target row of the table, clear the row and then insert new record. * Type: string @@ -313,10 +352,10 @@ together directly. `expand.root.level.arrays` -Determines whether root level arrays should be expanded to a column family -or serialized to a single column. If true, root level array fields will -be mapped to a Bigtable column family where each element is stored in -an individual column with its index as column qualifier, padded with +Determines whether root level arrays should be expanded to a column family +or serialized to a single column. If true, root level array fields will +be mapped to a Bigtable column family where each element is stored in +an individual column with its index as column qualifier, padded with zeros to a length of 6. If false, root level array fields will be serialized as a JSON string to a single column. @@ -389,7 +428,8 @@ The following steps are to run in standalone mode. They can be easily adapted for running Kafka Connect in distributed mode using the [REST API](http://kafka.apache.org/documentation.html#connect_running). -1. Download the latest `kafka-connect-bigtable-sink.jar` file from the [Releases page](https://github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/releases). +1. Download the latest `kafka-connect-bigtable-sink.jar` file from + the [Releases page](https://github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/releases). 2. Copy the `kafka-connect-bigtable-sink.jar` JAR file into your **plugin directory**. The plugin directory can be any of the directories specified by diff --git a/kafka-connect-bigtable-sink/doc/tests.md b/kafka-connect-bigtable-sink/doc/tests.md index 9820665c..97685121 100644 --- a/kafka-connect-bigtable-sink/doc/tests.md +++ b/kafka-connect-bigtable-sink/doc/tests.md @@ -108,5 +108,5 @@ mvn -Pintegration-tests clean verify #### To run a specific integration test ```bash -mvn -Pintegration-tests clean verify -Dit.test=InsertUpsertIT#testUpsert +mvn -Pintegration-tests clean verify -Dit.test=InsertModeIT#testUpsert ``` diff --git a/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/integration/ExtractTimestampIT.java b/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/integration/ExtractTimestampIT.java new file mode 100644 index 00000000..2d7519cf --- /dev/null +++ b/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/integration/ExtractTimestampIT.java @@ -0,0 +1,211 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.kafka.connect.bigtable.integration; + +import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.DEFAULT_COLUMN_FAMILY_CONFIG; +import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.INSERT_MODE_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowCell; +import com.google.cloud.kafka.connect.bigtable.config.InsertMode; +import com.google.cloud.kafka.connect.bigtable.transformations.ExtractTimestamp; +import com.google.cloud.kafka.connect.bigtable.transformations.TimestampPrecision; +import com.google.cloud.kafka.connect.bigtable.util.TestDataUtil; +import com.google.protobuf.ByteString; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.storage.StringConverter; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ExtractTimestampIT extends BaseKafkaConnectBigtableIT { + + @Test + public void testExtractTimestampFromValue() throws InterruptedException, ExecutionException { + Map props = baseConnectorProps(); + props.put(INSERT_MODE_CONFIG, InsertMode.UPSERT.name()); + props.put("transforms", "extractTimestamp"); + props.put("transforms.extractTimestamp.type", ExtractTimestamp.Value.class.getName()); + props.put("transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "ts"); + props.put( + "transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, + TimestampPrecision.MILLIS.name()); + props.put(DEFAULT_COLUMN_FAMILY_CONFIG, "cf"); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + + String testId = startSingleTopicConnector(props); + createTablesAndColumnFamilies(Map.of(testId, Set.of("cf"))); + + Schema schema = + SchemaBuilder.struct() + .field("id", Schema.STRING_SCHEMA) + .field("ts", Schema.INT64_SCHEMA) + .build(); + + long timestamp = 1234567890L; + Struct value = new Struct(schema).put("id", "val1").put("ts", timestamp); + + JsonConverter converter = new JsonConverter(); + converter.configure(Collections.singletonMap("schemas.enable", "true"), false); + byte[] valueJson = converter.fromConnectData(testId, schema, value); + + String key = "key1"; + connect.kafka().produce(testId, key, new String(valueJson)); + + waitUntilBigtableContainsNumberOfRows(testId, 1); + Map rows = readAllRows(bigtableData, testId); + Row row = rows.get(ByteString.copyFrom(key.getBytes(StandardCharsets.UTF_8))); + assertNotNull(row); + + List cells = row.getCells("cf", "id"); + assertEquals(1, cells.size()); + // Bigtable timestamps are in microseconds + assertEquals(timestamp * 1000, cells.get(0).getTimestamp()); + } + + @Test + public void testExtractTimestampFromNestedValue() + throws InterruptedException, ExecutionException { + Map props = baseConnectorProps(); + props.put(INSERT_MODE_CONFIG, InsertMode.UPSERT.name()); + props.put("transforms", "extractTimestamp"); + props.put("transforms.extractTimestamp.type", ExtractTimestamp.Value.class.getName()); + props.put( + "transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "nested.ts"); + props.put( + "transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, + TimestampPrecision.SECONDS.name()); + props.put(DEFAULT_COLUMN_FAMILY_CONFIG, "cf"); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + + String testId = startSingleTopicConnector(props); + createTablesAndColumnFamilies(Map.of(testId, Set.of("cf", "nested"))); + + Schema nestedSchema = SchemaBuilder.struct().field("ts", Schema.INT64_SCHEMA).build(); + Schema schema = + SchemaBuilder.struct() + .field("id", Schema.STRING_SCHEMA) + .field("nested", nestedSchema) + .build(); + + long timestampSeconds = 1600000000L; + Struct nestedValue = new Struct(nestedSchema).put("ts", timestampSeconds); + Struct value = new Struct(schema).put("id", "val2").put("nested", nestedValue); + + JsonConverter converter = new JsonConverter(); + converter.configure(Collections.singletonMap("schemas.enable", "true"), false); + byte[] valueJson = converter.fromConnectData(testId, schema, value); + + String key = "key2"; + connect.kafka().produce(testId, key, new String(valueJson)); + + waitUntilBigtableContainsNumberOfRows(testId, 1); + Map rows = readAllRows(bigtableData, testId); + Row row = rows.get(ByteString.copyFrom(key.getBytes(StandardCharsets.UTF_8))); + assertNotNull(row); + + List cells = row.getCells("cf", "id"); + assertEquals(1, cells.size()); + // Bigtable timestamps are in microseconds. + // timestampSeconds * 1000 (millis) * 1000 (micros) + assertEquals(timestampSeconds * 1000 * 1000, cells.get(0).getTimestamp()); + } + + @Test + public void testExtractTimestampFromKey() throws InterruptedException, ExecutionException { + Map props = baseConnectorProps(); + props.put(INSERT_MODE_CONFIG, InsertMode.UPSERT.name()); + props.put("transforms", "extractTimestamp"); + props.put("transforms.extractTimestamp.type", ExtractTimestamp.Key.class.getName()); + props.put("transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "ts"); + props.put(DEFAULT_COLUMN_FAMILY_CONFIG, "cf"); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put("key.converter", JsonConverter.class.getName()); + props.put("key.converter.schemas.enable", "true"); + + String testId = startSingleTopicConnector(props); + createTablesAndColumnFamilies(Map.of(testId, Set.of("cf"))); + + Schema keySchema = SchemaBuilder.struct().field("ts", Schema.INT64_SCHEMA).build(); + long timestamp = 9876543210L; + Struct keyStruct = new Struct(keySchema).put("ts", timestamp); + + JsonConverter converter = new JsonConverter(); + converter.configure(Collections.singletonMap("schemas.enable", "true"), true); + byte[] keyJson = converter.fromConnectData(testId, keySchema, keyStruct); + + String value = "some-value"; + connect.kafka().produce(testId, new String(keyJson), value); + + waitUntilBigtableContainsNumberOfRows(testId, 1); + Map rows = readAllRows(bigtableData, testId); + // The row key will be the JSON string of the key + Row row = rows.values().iterator().next(); + assertNotNull(row); + + List cells = row.getCells("cf", "KAFKA_VALUE"); + assertEquals(1, cells.size()); + assertEquals(timestamp * 1000, cells.get(0).getTimestamp()); + } + + @Test + public void testExtractTimestampFromValueNoSchema() throws Exception { + Map props = baseConnectorProps(); + props.put(INSERT_MODE_CONFIG, InsertMode.UPSERT.name()); + props.put("transforms", "extractTimestamp"); + props.put("transforms.extractTimestamp.type", ExtractTimestamp.Value.class.getName()); + props.put( + "transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "createdAt"); + props.put( + "transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, + TimestampPrecision.SECONDS.name()); + props.put(DEFAULT_COLUMN_FAMILY_CONFIG, "cf"); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + props.put("value.converter.schemas.enable", "false"); + + String testId = startSingleTopicConnector(props); + createTablesAndColumnFamilies(Map.of(testId, Set.of("cf"))); + + String json = TestDataUtil.readResource("json/order-no-schema.json"); + String key = "order1"; + connect.kafka().produce(testId, key, json); + + waitUntilBigtableContainsNumberOfRows(testId, 1); + Map rows = readAllRows(bigtableData, testId); + Row row = rows.get(ByteString.copyFrom(key.getBytes(StandardCharsets.UTF_8))); + assertNotNull(row); + + // createdAt: 1779122855 (seconds) + long expectedTimestampMicros = 1779122855L * 1000 * 1000; + + List cells = row.getCells("cf", "KAFKA_VALUE"); + assertEquals(1, cells.size()); + assertEquals(expectedTimestampMicros, cells.get(0).getTimestamp()); + } +} diff --git a/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/integration/InsertModeIT.java b/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/integration/InsertModeIT.java index b69615f5..680c6714 100644 --- a/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/integration/InsertModeIT.java +++ b/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/integration/InsertModeIT.java @@ -37,8 +37,6 @@ import com.google.cloud.kafka.connect.bigtable.util.TestDataUtil; import com.google.cloud.kafka.connect.bigtable.utils.ByteUtils; import com.google.protobuf.ByteString; -import java.io.IOException; -import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.AbstractMap; @@ -64,17 +62,6 @@ public class InsertModeIT extends BaseKafkaConnectBigtableIT { private static final ObjectMapper objectMapper = new ObjectMapper(); - private String readResource(String path) { - try (InputStream is = getClass().getClassLoader().getResourceAsStream(path)) { - if (is == null) { - throw new IllegalArgumentException("Resource not found: " + path); - } - return new String(is.readAllBytes(), StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - private static final String KEY1 = "key1"; private static final String KEY2 = "key2"; private static final String KEY3 = "key3"; @@ -163,7 +150,7 @@ public void testUpsertWithRowKeyFromValue() throws InterruptedException, Executi String testId = startSingleTopicConnector(props); createTablesAndColumnFamilies(Map.of(testId, Set.of("cf"))); - String json = readResource("json/order-with-schema.json"); + String json = TestDataUtil.readResource("json/order-with-schema.json"); connect.kafka().produce(testId, KEY1, json); waitUntilBigtableContainsNumberOfRows(testId, 1); @@ -171,7 +158,7 @@ public void testUpsertWithRowKeyFromValue() throws InterruptedException, Executi ByteString key = ByteString.copyFrom("ORD-12345#ball".getBytes(StandardCharsets.UTF_8)); Row row1 = rows.get(key); assertNotNull(row1); - assertEquals(3, row1.getCells().size()); + assertEquals(4, row1.getCells().size()); List orderIdCells = row1.getCells("cf", "orderId"); assertEquals(1, orderIdCells.size()); @@ -184,6 +171,11 @@ public void testUpsertWithRowKeyFromValue() throws InterruptedException, Executi List quantityCells = row1.getCells("cf", "quantity"); assertEquals(1, quantityCells.size()); assertArrayEquals(ByteUtils.toBytes(2), quantityCells.get(0).getValue().toByteArray()); + + List createdAtCells = row1.getCells("cf", "createdAt"); + assertEquals(1, createdAtCells.size()); + assertArrayEquals( + ByteUtils.toBytes(1779122855L), createdAtCells.get(0).getValue().toByteArray()); } @Test @@ -204,7 +196,7 @@ public void testUpsertWithRowKeyFromValueNoSchema() String testId = startSingleTopicConnector(props); createTablesAndColumnFamilies(Map.of(testId, Set.of("cf"))); - String json = readResource("json/order-no-schema.json"); + String json = TestDataUtil.readResource("json/order-no-schema.json"); connect.kafka().produce(testId, KEY1, json); waitUntilBigtableContainsNumberOfRows(testId, 1); @@ -238,7 +230,7 @@ public void testUpsertWithRowKeyFromValueMissingField() String testId = startSingleTopicConnector(props); createTablesAndColumnFamilies(Map.of(testId, Set.of("cf"))); - String json = readResource("json/order-with-schema.json"); + String json = TestDataUtil.readResource("json/order-with-schema.json"); connect.kafka().produce(testId, KEY1, json); assertSingleDlqEntry(dlqTopic, KEY1, json, DataException.class); @@ -262,7 +254,7 @@ public void testUpsertWithRowKeyFromValueNullValue() String testId = startSingleTopicConnector(props); createTablesAndColumnFamilies(Map.of(testId, Set.of("cf"))); - String json = readResource("json/order-with-null-product-schema.json"); + String json = TestDataUtil.readResource("json/order-with-null-product-schema.json"); connect.kafka().produce(testId, KEY1, json); // null key values aren't allowed @@ -278,7 +270,9 @@ public void testUpsertAppliedSchema() props.put("value.converter.schemas.enable", "false"); props.put("transforms", "applySchema,createKey,flattenElements"); props.put("transforms.applySchema.type", ApplyJsonSchema.class.getName() + "$Value"); - props.put("transforms.applySchema.schema.json", readResource("json/applied-schema.json")); + props.put( + "transforms.applySchema.schema.json", + TestDataUtil.readResource("json/applied-schema.json")); props.put("transforms.createKey.type", "org.apache.kafka.connect.transforms.ValueToKey"); props.put("transforms.createKey.fields", "userId,orderId"); props.put("transforms.flattenElements.type", FlattenArrayElement.class.getName()); @@ -298,7 +292,7 @@ public void testUpsertAppliedSchema() String testId = startSingleTopicConnector(props); createTablesAndColumnFamilies(Map.of(testId, Set.of("cf", "products"))); - String json = readResource("json/expanded-order.json"); + String json = TestDataUtil.readResource("json/expanded-order.json"); connect.kafka().produce(testId, KEY1, json); waitUntilBigtableContainsNumberOfRows(testId, 1); diff --git a/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/util/TestDataUtil.java b/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/util/TestDataUtil.java index 93d5074c..843110df 100644 --- a/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/util/TestDataUtil.java +++ b/kafka-connect-bigtable-sink/integration-tests/src/test/java/com/google/cloud/kafka/connect/bigtable/util/TestDataUtil.java @@ -19,6 +19,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowCell; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -243,4 +246,16 @@ public String toString() { + '}'; } } + + public static String readResource(String path) { + try (InputStream is = + Thread.currentThread().getContextClassLoader().getResourceAsStream(path)) { + if (is == null) { + throw new IllegalArgumentException("Resource not found: " + path); + } + return new String(is.readAllBytes(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/applied-schema.json b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/applied-schema.json index 025f006e..a114cfff 100644 --- a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/applied-schema.json +++ b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/applied-schema.json @@ -11,6 +11,11 @@ "optional": false, "field": "userId" }, + { + "type": "int64", + "optional": false, + "field": "createdAt" + }, { "type": "struct", "fields": [ diff --git a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/expanded-order.json b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/expanded-order.json index afa7dd43..3d7a6eb1 100644 --- a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/expanded-order.json +++ b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/expanded-order.json @@ -1,6 +1,7 @@ { "orderId": "ORD-999", "userId": "USER-42", + "createdAt": 1779122855, "products": { "list": [ { diff --git a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-no-schema.json b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-no-schema.json index e4716e43..457bb472 100644 --- a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-no-schema.json +++ b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-no-schema.json @@ -1,5 +1,6 @@ { "orderId": "ORD-12345", "product": "ball", - "quantity": 2 + "quantity": 2, + "createdAt": 1779122855 } \ No newline at end of file diff --git a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-with-null-product-schema.json b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-with-null-product-schema.json index c95fa8d3..7daa3d91 100644 --- a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-with-null-product-schema.json +++ b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-with-null-product-schema.json @@ -5,12 +5,14 @@ "fields": [ { "field": "orderId", "type": "string", "optional": false }, { "field": "product", "type": "string", "optional": true }, - { "field": "quantity", "type": "int32", "optional": false } + { "field": "quantity", "type": "int32", "optional": false }, + { "field": "createdAt", "type": "int64", "optional": false } ] }, "payload": { "orderId": "ORD-12345", "product": null, - "quantity": 2 + "quantity": 2, + "createdAt": 1779122855 } } \ No newline at end of file diff --git a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-with-schema.json b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-with-schema.json index 0329be66..5f5e7077 100644 --- a/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-with-schema.json +++ b/kafka-connect-bigtable-sink/integration-tests/src/test/resources/json/order-with-schema.json @@ -5,12 +5,14 @@ "fields": [ { "field": "orderId", "type": "string", "optional": false }, { "field": "product", "type": "string", "optional": false }, - { "field": "quantity", "type": "int32", "optional": false } + { "field": "quantity", "type": "int32", "optional": false }, + { "field": "createdAt", "type": "int64", "optional": false } ] }, "payload": { "orderId": "ORD-12345", "product": "ball", - "quantity": 2 + "quantity": 2, + "createdAt": 1779122855 } } \ No newline at end of file diff --git a/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java index 44874a44..6e60ca0d 100644 --- a/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java +++ b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java @@ -28,6 +28,7 @@ import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.cloud.kafka.connect.bigtable.util.ConfigUtils; import com.google.cloud.kafka.connect.bigtable.version.PackageMetadata; import com.google.cloud.kafka.connect.bigtable.wrappers.BigtableTableAdminClientInterface; import com.google.cloud.kafka.connect.bigtable.wrappers.BigtableTableAdminClientWrapper; @@ -264,7 +265,7 @@ public static ConfigDef getDefinition() { INSERT_MODE_CONFIG, ConfigDef.Type.STRING, InsertMode.INSERT.name(), - enumValidator(InsertMode.values()), + ConfigUtils.enumValidator(InsertMode.values()), ConfigDef.Importance.HIGH, "Defines the insertion mode to use. Supported modes are:\n" + "- insert - Insert new record only. If the row to be written already exists in" @@ -289,7 +290,7 @@ public static ConfigDef getDefinition() { VALUE_NULL_MODE_CONFIG, ConfigDef.Type.STRING, NullValueMode.WRITE.name(), - enumValidator(NullValueMode.values()), + ConfigUtils.enumValidator(NullValueMode.values()), ConfigDef.Importance.MEDIUM, "Defines what to do with `null`s within Kafka values. Supported modes are:" + "\n- write - Serialize `null`s to empty byte arrays." @@ -304,7 +305,7 @@ public static ConfigDef getDefinition() { ERROR_MODE_CONFIG, ConfigDef.Type.STRING, BigtableErrorMode.FAIL.name(), - enumValidator(BigtableErrorMode.values()), + ConfigUtils.enumValidator(BigtableErrorMode.values()), ConfigDef.Importance.MEDIUM, "Specifies how to handle errors that result from writes, after retries. It is ignored" + " if DLQ is configured. Supported modes are:" @@ -662,11 +663,6 @@ private T getEnum(String configName, Function converter) { } } - private static ConfigDef.Validator enumValidator(Enum[] enumValues) { - return ConfigDef.CaseInsensitiveValidString.in( - Arrays.stream(enumValues).map(Enum::name).toArray(String[]::new)); - } - /** * @return {@link Optional#empty()} if the user didn't configure the Cloud Bigtable credentials, * {@link Optional} containing {@link CredentialsProvider} configured as described in {@link diff --git a/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapper.java b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapper.java index ed8feecb..c8d57454 100644 --- a/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapper.java +++ b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapper.java @@ -16,6 +16,7 @@ package com.google.cloud.kafka.connect.bigtable.mapping; import com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig; +import com.google.cloud.kafka.connect.bigtable.util.SchemaParsingUtils; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.math.BigDecimal; @@ -39,7 +40,7 @@ * SinkRecord(s)} into Cloud Bigtable row keys. */ public class KeyMapper { - final List> definition; + final String[][] definition; final byte[] delimiter; /** @@ -52,11 +53,7 @@ public class KeyMapper { */ public KeyMapper(String delimiter, List definition) { this.delimiter = delimiter.getBytes(StandardCharsets.UTF_8); - this.definition = - definition.stream() - .map(s -> s.split("\\.")) - .map(Arrays::asList) - .collect(Collectors.toList()); + this.definition = definition.stream().map(s -> s.split("\\.")).toArray(String[][]::new); } /** @@ -72,35 +69,37 @@ public KeyMapper(String delimiter, List definition) { public byte[] getKey(SchemaAndValue kafkaKeyAndSchema) { Object kafkaKey = kafkaKeyAndSchema.value(); Optional kafkaKeySchema = Optional.ofNullable(kafkaKeyAndSchema.schema()); - ensureKeyElementIsNotNull(kafkaKey); + SchemaParsingUtils.ensureKeyElementIsNotNull(kafkaKey); SchemaAndValue keySchemaAndValue = new SchemaAndValue(kafkaKeySchema.orElse(null), kafkaKey); + Stream keyParts = - this.getDefinition(kafkaKey).stream() - .map((d) -> serializeTopLevelKeyElement(extractField(keySchemaAndValue, d.iterator()))); + Arrays.stream(this.getDefinition(kafkaKey)) + .map( + (d) -> + serializeTopLevelKeyElement( + SchemaParsingUtils.extractField(keySchemaAndValue, d))); + return concatenateByteArrays(new byte[0], keyParts, delimiter, new byte[0]); } /** * Returns key definition as configured during object creation or extracted from the object being - * mapped if it's been configured to an empty {@link List}. + * mapped if it's been configured to an empty definition. * * @param kafkaKey {@link org.apache.kafka.connect.sink.SinkRecord SinkRecord's} key. - * @return {@link List} containing {@link List Lists} of key fields that need to be retrieved and - * concatenated to construct the Cloud Bigtable row key. - *

See {@link KeyMapper#extractField(SchemaAndValue, Iterator)} for details on semantics of - * the inner list. + * @return A 2D array of strings containing key fields that need to be retrieved and concatenated + * to construct the Cloud Bigtable row key. + *

See {@link SchemaParsingUtils#extractField(SchemaAndValue, String[])} for details on + * semantics of the inner elements. */ - private List> getDefinition(Object kafkaKey) { - if (this.definition.isEmpty()) { + private String[][] getDefinition(Object kafkaKey) { + if (this.definition.length == 0) { Optional> maybeRootFields = getFieldsOfRootValue(kafkaKey); - if (maybeRootFields.isEmpty()) { - List rootElementDefinition = List.of(); - return List.of(rootElementDefinition); - } else { - return maybeRootFields.get().stream() - .map(Collections::singletonList) - .collect(Collectors.toList()); - } + return maybeRootFields + .map( + strings -> + strings.stream().map(field -> new String[] {field}).toArray(String[][]::new)) + .orElseGet(() -> new String[][] {new String[0]}); } return this.definition; } @@ -122,45 +121,9 @@ private static Optional> getFieldsOfRootValue(Object kafkaKey) { } } - /** - * Extract possibly nested fields from the input value. - * - * @param keySchemaAndValue {@link org.apache.kafka.connect.sink.SinkRecord SinkRecord's} key or - * some its child with corresponding {@link Schema}. - * @param fields Fields that need to be accessed before the target value is reached. - * @return Extracted nested field. - */ - private SchemaAndValue extractField(SchemaAndValue keySchemaAndValue, Iterator fields) { - Object value = keySchemaAndValue.value(); - Optional schema = Optional.ofNullable(keySchemaAndValue.schema()); - ensureKeyElementIsNotNull(value); - LogicalTypeUtils.logIfLogicalTypeUnsupported(schema); - if (!fields.hasNext()) { - return keySchemaAndValue; - } - String field = fields.next(); - if (value instanceof Struct) { - // Note that getWithoutDefault() throws if such a field does not exist. - Object fieldValue = ((Struct) value).getWithoutDefault(field); - Schema fieldSchema = SchemaUtils.maybeExtractFieldSchema(schema, field).orElse(null); - return extractField(new SchemaAndValue(fieldSchema, fieldValue), fields); - } else if (value instanceof Map) { - Object fieldValue = ((Map) value).get(field); - Schema fieldSchema = SchemaUtils.maybeExtractFieldSchema(schema, field).orElse(null); - return extractField(new SchemaAndValue(fieldSchema, fieldValue), fields); - } else { - throw new DataException( - "Unexpected class `" - + value.getClass().getName() - + "` doesn't support extracting field `" - + field - + "` using a dot."); - } - } - private static byte[] serializeTopLevelKeyElement(SchemaAndValue keyElementAndSchema) { Object keyElement = keyElementAndSchema.value(); - ensureKeyElementIsNotNull(keyElement); + SchemaParsingUtils.ensureKeyElementIsNotNull(keyElement); return serializeKeyElement(keyElement, Optional.ofNullable(keyElementAndSchema.schema())); } @@ -277,13 +240,6 @@ private static byte[] serializeKeyElement(Object keyElement, Optional ke } } - private static void ensureKeyElementIsNotNull(Object value) { - if (value == null) { - // Matching Confluent's sink behavior. - throw new DataException("Error with row key definition: row key fields cannot be null."); - } - } - private static byte[] concatenateByteArrays( byte[] start, Stream byteArrays, byte[] delimiter, byte[] end) { try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { diff --git a/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java new file mode 100644 index 00000000..aeac0fab --- /dev/null +++ b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java @@ -0,0 +1,151 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.kafka.connect.bigtable.transformations; + +import com.google.cloud.kafka.connect.bigtable.util.ConfigUtils; +import com.google.cloud.kafka.connect.bigtable.util.SchemaParsingUtils; +import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +public abstract class ExtractTimestamp> implements Transformation { + + public static final String TIMESTAMP_FIELD_CONFIG = "timestamp.field"; + public static final String TIMESTAMP_FIELD_PRECISION_CONFIG = "timestamp.field.precision"; + + public static final ConfigDef CONFIG_DEF = + new ConfigDef() + .define( + TIMESTAMP_FIELD_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "The name of the timestamp field. Non-root fields can be referenced by specifying the" + + " field path, with periods separating each field. If the field cannot be found," + + " or if the value is null, the message is failed. The field may be a numeric," + + " string or date type.") + .define( + TIMESTAMP_FIELD_PRECISION_CONFIG, + ConfigDef.Type.STRING, + TimestampPrecision.MILLIS.name(), + ConfigUtils.enumValidator(TimestampPrecision.values()), + ConfigDef.Importance.HIGH, + "The precision of the timestamp field. Defaults to MILLIS. This only effects the" + + " output for numeric fields. Ignore this config if your field is a date type." + + " Supported values are NANOS, MICROS, MILLIS and SECONDS. Use the value that" + + " matches the field's precision. Example: if your field has epoch millisecond" + + " values, use the MILLIS config value."); + + private String[] fieldPath; + private TimestampPrecision timestampPrecision; + + @Override + public void configure(Map configs) { + SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + this.fieldPath = config.getString(TIMESTAMP_FIELD_CONFIG).split("\\."); + this.timestampPrecision = + TimestampPrecision.valueOf( + config.getString(TIMESTAMP_FIELD_PRECISION_CONFIG).toUpperCase()); + } + + @Override + public R apply(R record) { + SchemaAndValue timestampField = + SchemaParsingUtils.extractField(getOperatingValue(record), fieldPath); + long parsedTimestampMillis = + ExtractTimestamp.parseTimestampToMillis(timestampField, timestampPrecision); + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + record.value(), + parsedTimestampMillis); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() {} + + protected abstract SchemaAndValue getOperatingValue(R record); + + @VisibleForTesting + static long parseTimestampToMillis(SchemaAndValue value, TimestampPrecision timestampPrecision) { + if (value == null || value.value() == null) { + throw new IllegalArgumentException("Cannot parse timestamp value of null"); + } + + Object rawValue = value.value(); + + // Handle native Connect logical Timestamp/Date objects directly + if (rawValue instanceof java.util.Date) { + return ((java.util.Date) rawValue).getTime(); + } + if (rawValue instanceof java.time.Instant) { + return ((java.time.Instant) rawValue).toEpochMilli(); + } + + // Extract the epoch number safely (supports both Schema-based and Schemaless records) + long epochValue; + if (rawValue instanceof Number) { + epochValue = ((Number) rawValue).longValue(); + } else if (rawValue instanceof String) { + String strVal = ((String) rawValue).trim(); + epochValue = Long.parseLong(strVal); + } else { + throw new IllegalArgumentException( + "Unsupported timestamp payload type: " + rawValue.getClass().getName()); + } + + // Resolve to target milliseconds resolution + switch (timestampPrecision) { + case NANOS: + return epochValue / 1_000_000L; + case MICROS: + return epochValue / 1000L; + case MILLIS: + return epochValue; + case SECONDS: + return epochValue * 1000L; + default: + throw new IllegalStateException("Unexpected timestamp precision: " + timestampPrecision); + } + } + + // Boilerplate for Key/Value distinct implementations + public static class Key> extends ExtractTimestamp { + + @Override + protected SchemaAndValue getOperatingValue(R record) { + return new SchemaAndValue(record.keySchema(), record.key()); + } + } + + public static class Value> extends ExtractTimestamp { + @Override + protected SchemaAndValue getOperatingValue(R record) { + return new SchemaAndValue(record.valueSchema(), record.value()); + } + } +} diff --git a/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/TimestampPrecision.java b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/TimestampPrecision.java new file mode 100644 index 00000000..8196de2b --- /dev/null +++ b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/TimestampPrecision.java @@ -0,0 +1,23 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.kafka.connect.bigtable.transformations; + +public enum TimestampPrecision { + NANOS, + MICROS, + MILLIS, + SECONDS, +} diff --git a/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/ConfigUtils.java b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/ConfigUtils.java new file mode 100644 index 00000000..10c7dfc4 --- /dev/null +++ b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/ConfigUtils.java @@ -0,0 +1,26 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.kafka.connect.bigtable.util; + +import java.util.Arrays; +import org.apache.kafka.common.config.ConfigDef; + +public class ConfigUtils { + public static ConfigDef.Validator enumValidator(Enum[] enumValues) { + return ConfigDef.CaseInsensitiveValidString.in( + Arrays.stream(enumValues).map(Enum::name).toArray(String[]::new)); + } +} diff --git a/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtils.java b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtils.java new file mode 100644 index 00000000..914e633e --- /dev/null +++ b/kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtils.java @@ -0,0 +1,81 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.kafka.connect.bigtable.util; + +import com.google.cloud.kafka.connect.bigtable.mapping.LogicalTypeUtils; +import com.google.cloud.kafka.connect.bigtable.mapping.SchemaUtils; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; + +public class SchemaParsingUtils { + + public static SchemaAndValue extractField(SchemaAndValue keySchemaAndValue, String[] fields) { + return extractField(keySchemaAndValue, fields, 0); + } + + /** + * Extract possibly nested fields from the input value. If more than one field is provided, this + * method will recursively iterate over the entire field array until it reaches the last field, + * and then returns that field's schema and value. + * + * @param keySchemaAndValue {@link org.apache.kafka.connect.sink.SinkRecord SinkRecord's} key or + * some it's child with corresponding {@link Schema}. + * @param fields Fields that need to be accessed before the target value is reached. This array + * represents a path to a field, where each element is a child of the previous element's + * field. + * @param index Index of the field that is being extracted. + * @return Extracted nested field. + */ + private static SchemaAndValue extractField( + SchemaAndValue keySchemaAndValue, String[] fields, int index) { + Object value = keySchemaAndValue.value(); + Optional schema = Optional.ofNullable(keySchemaAndValue.schema()); + ensureKeyElementIsNotNull(value); + LogicalTypeUtils.logIfLogicalTypeUnsupported(schema); + if (index >= fields.length) { + return keySchemaAndValue; + } + String field = fields[index]; + if (value instanceof Struct) { + // Note that getWithoutDefault() throws if such a field does not exist. + Object fieldValue = ((Struct) value).getWithoutDefault(field); + Schema fieldSchema = SchemaUtils.maybeExtractFieldSchema(schema, field).orElse(null); + return extractField(new SchemaAndValue(fieldSchema, fieldValue), fields, index + 1); + } else if (value instanceof Map) { + Object fieldValue = ((Map) value).get(field); + Schema fieldSchema = SchemaUtils.maybeExtractFieldSchema(schema, field).orElse(null); + return extractField(new SchemaAndValue(fieldSchema, fieldValue), fields, index + 1); + } else { + throw new DataException( + "Unexpected class `" + + value.getClass().getName() + + "` doesn't support extracting field `" + + field + + "` using a dot."); + } + } + + public static void ensureKeyElementIsNotNull(Object value) { + if (value == null) { + // Matching Confluent's sink behavior. + throw new DataException("The extracted field value cannot be null."); + } + } +} diff --git a/kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestampTest.java b/kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestampTest.java new file mode 100644 index 00000000..067f66fa --- /dev/null +++ b/kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestampTest.java @@ -0,0 +1,292 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.kafka.connect.bigtable.transformations; + +import static org.junit.Assert.*; + +import java.time.Instant; +import java.util.*; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ExtractTimestampTest { + + private ExtractTimestamp.Value valueSmt; + private ExtractTimestamp.Key keySmt; + + @Before + public void setUp() { + valueSmt = new ExtractTimestamp.Value<>(); + keySmt = new ExtractTimestamp.Key<>(); + } + + @After + public void tearDown() { + valueSmt.close(); + keySmt.close(); + } + + @Test + public void testParseTimestampToMillis() { + long baseMillis = 1715698738123L; + + // Test different precisions with numeric input + assertEquals( + baseMillis, + ExtractTimestamp.parseTimestampToMillis( + new SchemaAndValue(Schema.INT64_SCHEMA, baseMillis * 1000000L), + TimestampPrecision.NANOS)); + assertEquals( + baseMillis, + ExtractTimestamp.parseTimestampToMillis( + new SchemaAndValue(Schema.INT64_SCHEMA, baseMillis * 1000L), + TimestampPrecision.MICROS)); + assertEquals( + baseMillis, + ExtractTimestamp.parseTimestampToMillis( + new SchemaAndValue(Schema.INT64_SCHEMA, baseMillis), TimestampPrecision.MILLIS)); + assertEquals( + baseMillis - (baseMillis % 1000), // SECONDS loses precision + ExtractTimestamp.parseTimestampToMillis( + new SchemaAndValue(Schema.INT64_SCHEMA, baseMillis / 1000), + TimestampPrecision.SECONDS)); + + // Test different input types + assertEquals( + baseMillis, + ExtractTimestamp.parseTimestampToMillis( + new SchemaAndValue(Schema.STRING_SCHEMA, String.valueOf(baseMillis)), + TimestampPrecision.MILLIS)); + assertEquals( + baseMillis, + ExtractTimestamp.parseTimestampToMillis( + new SchemaAndValue(null, new Date(baseMillis)), TimestampPrecision.MILLIS)); + assertEquals( + baseMillis, + ExtractTimestamp.parseTimestampToMillis( + new SchemaAndValue(null, Instant.ofEpochMilli(baseMillis)), TimestampPrecision.MILLIS)); + } + + @Test + public void testAllTimestampPrecisionsImplemented() { + SchemaAndValue value = new SchemaAndValue(Schema.INT64_SCHEMA, 123456789L); + for (TimestampPrecision precision : TimestampPrecision.values()) { + try { + // We don't care about the output value, just that it doesn't throw IllegalStateException + // (default case) + ExtractTimestamp.parseTimestampToMillis(value, precision); + } catch (IllegalStateException e) { + fail("TimestampPrecision " + precision + " is not implemented in parseTimestampToMillis"); + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void testParseTimestampToMillisNullValue() { + ExtractTimestamp.parseTimestampToMillis(null, TimestampPrecision.MILLIS); + } + + @Test(expected = IllegalArgumentException.class) + public void testParseTimestampToMillisNullInnerValue() { + ExtractTimestamp.parseTimestampToMillis( + new SchemaAndValue(Schema.OPTIONAL_INT64_SCHEMA, null), TimestampPrecision.MILLIS); + } + + @Test(expected = IllegalArgumentException.class) + public void testParseTimestampToMillisUnsupportedType() { + ExtractTimestamp.parseTimestampToMillis( + new SchemaAndValue(null, new Object()), TimestampPrecision.MILLIS); + } + + @Test + public void testExtractTimestampFromStructValue() { + Map configs = new HashMap<>(); + configs.put(ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "ts"); + configs.put( + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, TimestampPrecision.MILLIS.name()); + valueSmt.configure(configs); + + Schema valueSchema = SchemaBuilder.struct().field("ts", Schema.INT64_SCHEMA).build(); + Struct value = new Struct(valueSchema).put("ts", 123456789L); + + SourceRecord record = new SourceRecord(null, null, "test", 0, null, null, valueSchema, value); + SourceRecord transformed = valueSmt.apply(record); + + assertEquals(Long.valueOf(123456789L), transformed.timestamp()); + } + + @Test + public void testExtractTimestampFromStructKey() { + Map configs = new HashMap<>(); + configs.put(ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "ts"); + configs.put( + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, TimestampPrecision.MILLIS.name()); + keySmt.configure(configs); + + Schema keySchema = SchemaBuilder.struct().field("ts", Schema.INT64_SCHEMA).build(); + Struct key = new Struct(keySchema).put("ts", 123456789L); + + SourceRecord record = new SourceRecord(null, null, "test", 0, keySchema, key, null, null); + SourceRecord transformed = keySmt.apply(record); + + assertEquals(Long.valueOf(123456789L), transformed.timestamp()); + } + + @Test + public void testNestedFieldExtraction() { + Map configs = new HashMap<>(); + configs.put(ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "outer.inner"); + configs.put( + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, TimestampPrecision.MILLIS.name()); + valueSmt.configure(configs); + + Schema innerSchema = SchemaBuilder.struct().field("inner", Schema.INT64_SCHEMA).build(); + Schema outerSchema = SchemaBuilder.struct().field("outer", innerSchema).build(); + + Struct inner = new Struct(innerSchema).put("inner", 987654321L); + Struct outer = new Struct(outerSchema).put("outer", inner); + + SourceRecord record = new SourceRecord(null, null, "test", 0, null, null, outerSchema, outer); + SourceRecord transformed = valueSmt.apply(record); + + assertEquals(Long.valueOf(987654321L), transformed.timestamp()); + } + + @Test + public void testNestedMapExtraction() { + Map configs = new HashMap<>(); + configs.put(ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "outer.inner"); + configs.put( + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, TimestampPrecision.MILLIS.name()); + valueSmt.configure(configs); + + Map inner = Collections.singletonMap("inner", 987654321L); + Map outer = Collections.singletonMap("outer", inner); + + SourceRecord record = new SourceRecord(null, null, "test", 0, null, null, null, outer); + SourceRecord transformed = valueSmt.apply(record); + + assertEquals(Long.valueOf(987654321L), transformed.timestamp()); + } + + @Test + public void testFormats() { + long baseMillis = 1715698738000L; + + // SECONDS + verifyFormat(baseMillis / 1000, TimestampPrecision.SECONDS, baseMillis); + + // MILLIS + verifyFormat(baseMillis, TimestampPrecision.MILLIS, baseMillis); + + // MICROS + verifyFormat(baseMillis * 1000, TimestampPrecision.MICROS, baseMillis); + + // NANOS + verifyFormat(baseMillis * 1000000, TimestampPrecision.NANOS, baseMillis); + } + + private void verifyFormat(long inputValue, TimestampPrecision format, long expectedMillis) { + Map configs = new HashMap<>(); + configs.put(ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "ts"); + configs.put(ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, format.name()); + ExtractTimestamp.Value smt = new ExtractTimestamp.Value<>(); + smt.configure(configs); + + Schema schema = SchemaBuilder.struct().field("ts", Schema.INT64_SCHEMA).build(); + Struct value = new Struct(schema).put("ts", inputValue); + SourceRecord record = new SourceRecord(null, null, "test", 0, null, null, schema, value); + + SourceRecord transformed = smt.apply(record); + assertEquals( + "Failed for format " + format, Long.valueOf(expectedMillis), transformed.timestamp()); + } + + @Test + public void testInputTypes() { + long expectedMillis = 1715698738123L; + + // Long + verifyInputType(expectedMillis, expectedMillis); + + // Int + verifyInputType(123, 123); + + // String (long) + verifyInputType(String.valueOf(expectedMillis), expectedMillis); + + // Date + verifyInputType(new Date(expectedMillis), expectedMillis); + + // Instant + verifyInputType(Instant.ofEpochMilli(expectedMillis), expectedMillis); + } + + private void verifyInputType(Object inputValue, long expectedMillis) { + Map configs = new HashMap<>(); + configs.put(ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "ts"); + configs.put( + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, TimestampPrecision.MILLIS.name()); + valueSmt.configure(configs); + + Schema schema = null; // Schemaless test + Map value = Collections.singletonMap("ts", inputValue); + SourceRecord record = new SourceRecord(null, null, "test", 0, null, null, schema, value); + + SourceRecord transformed = valueSmt.apply(record); + assertEquals( + "Failed for input type " + inputValue.getClass().getName(), + Long.valueOf(expectedMillis), + transformed.timestamp()); + } + + @Test(expected = DataException.class) + public void testMissingField() { + Map configs = new HashMap<>(); + configs.put(ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "missing"); + configs.put( + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, TimestampPrecision.MILLIS.name()); + valueSmt.configure(configs); + + Schema schema = SchemaBuilder.struct().field("other", Schema.INT64_SCHEMA).build(); + Struct value = new Struct(schema).put("other", 123L); + SourceRecord record = new SourceRecord(null, null, "test", 0, null, null, schema, value); + + valueSmt.apply(record); + } + + @Test(expected = DataException.class) + public void testNullField() { + Map configs = new HashMap<>(); + configs.put(ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "ts"); + configs.put( + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG, TimestampPrecision.MILLIS.name()); + valueSmt.configure(configs); + + Schema schema = SchemaBuilder.struct().field("ts", Schema.OPTIONAL_INT64_SCHEMA).build(); + Struct value = new Struct(schema).put("ts", null); + SourceRecord record = new SourceRecord(null, null, "test", 0, null, null, schema, value); + + valueSmt.apply(record); + } +} diff --git a/kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtilsTest.java b/kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtilsTest.java new file mode 100644 index 00000000..4758dc41 --- /dev/null +++ b/kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtilsTest.java @@ -0,0 +1,150 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.kafka.connect.bigtable.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.junit.Test; + +public class SchemaParsingUtilsTest { + + @Test + public void testExtractFieldFromStruct() { + Schema schema = SchemaBuilder.struct().field("f1", Schema.STRING_SCHEMA).build(); + Struct struct = new Struct(schema).put("f1", "v1"); + SchemaAndValue input = new SchemaAndValue(schema, struct); + + SchemaAndValue result = SchemaParsingUtils.extractField(input, new String[] {"f1"}); + assertEquals(Schema.STRING_SCHEMA, result.schema()); + assertEquals("v1", result.value()); + } + + @Test + public void testExtractFieldFromMap() { + Map map = Collections.singletonMap("f1", "v1"); + SchemaAndValue input = new SchemaAndValue(null, map); + + SchemaAndValue result = SchemaParsingUtils.extractField(input, new String[] {"f1"}); + assertNull(result.schema()); + assertEquals("v1", result.value()); + } + + @Test + public void testExtractFieldNestedStruct() { + Schema innerSchema = SchemaBuilder.struct().field("inner", Schema.INT32_SCHEMA).build(); + Schema outerSchema = SchemaBuilder.struct().field("outer", innerSchema).build(); + + Struct inner = new Struct(innerSchema).put("inner", 42); + Struct outer = new Struct(outerSchema).put("outer", inner); + SchemaAndValue input = new SchemaAndValue(outerSchema, outer); + + SchemaAndValue result = SchemaParsingUtils.extractField(input, new String[] {"outer", "inner"}); + assertEquals(Schema.INT32_SCHEMA, result.schema()); + assertEquals(42, result.value()); + } + + @Test + public void testExtractFieldNestedMap() { + Map inner = Collections.singletonMap("inner", 42); + Map outer = Collections.singletonMap("outer", inner); + SchemaAndValue input = new SchemaAndValue(null, outer); + + SchemaAndValue result = SchemaParsingUtils.extractField(input, new String[] {"outer", "inner"}); + assertEquals(42, result.value()); + } + + @Test + public void testExtractFieldMixed() { + Schema innerSchema = SchemaBuilder.struct().field("inner", Schema.INT32_SCHEMA).build(); + Struct inner = new Struct(innerSchema).put("inner", 42); + + Map outer = Collections.singletonMap("outer", inner); + SchemaAndValue input = new SchemaAndValue(null, outer); + + SchemaAndValue result = SchemaParsingUtils.extractField(input, new String[] {"outer", "inner"}); + assertNull(result.schema()); + assertEquals(42, result.value()); + } + + @Test(expected = DataException.class) + public void testExtractNestedFieldNotFound() { + Schema innerSchema = SchemaBuilder.struct().field("inner", Schema.INT32_SCHEMA).build(); + Struct inner = new Struct(innerSchema).put("inner", 42); + + Map outer = Collections.singletonMap("outer", inner); + SchemaAndValue input = new SchemaAndValue(null, outer); + + SchemaParsingUtils.extractField(input, new String[] {"outer", "not_found"}); + } + + @Test + public void testExtractFieldEmptyPath() { + SchemaAndValue input = new SchemaAndValue(Schema.STRING_SCHEMA, "v1"); + SchemaAndValue result = SchemaParsingUtils.extractField(input, new String[] {}); + assertEquals(input, result); + } + + @Test(expected = DataException.class) + public void testExtractFieldNullValue() { + SchemaAndValue input = new SchemaAndValue(Schema.STRING_SCHEMA, null); + SchemaParsingUtils.extractField(input, new String[] {"f1"}); + } + + @Test(expected = DataException.class) + public void testExtractFieldMissingInStruct() { + Schema schema = SchemaBuilder.struct().field("f1", Schema.STRING_SCHEMA).build(); + Struct struct = new Struct(schema).put("f1", "v1"); + SchemaAndValue input = new SchemaAndValue(schema, struct); + + // Struct.getWithoutDefault throws DataException (wrapped from SchemaException usually, but + // Kafka Connect Struct throws DataException) + SchemaParsingUtils.extractField(input, new String[] {"f2"}); + } + + @Test(expected = DataException.class) + public void testExtractFieldMissingInMap() { + Map map = new HashMap<>(); + SchemaAndValue input = new SchemaAndValue(null, map); + + // Map.get("f1") returns null, then next recursion calls ensureKeyElementIsNotNull(null) + SchemaParsingUtils.extractField(input, new String[] {"f1"}); + } + + @Test(expected = DataException.class) + public void testExtractFieldUnsupportedType() { + SchemaAndValue input = new SchemaAndValue(Schema.STRING_SCHEMA, "not-a-struct-or-map"); + SchemaParsingUtils.extractField(input, new String[] {"f1"}); + } + + @Test + public void testEnsureKeyElementIsNotNull() { + SchemaParsingUtils.ensureKeyElementIsNotNull("not null"); + } + + @Test(expected = DataException.class) + public void testEnsureKeyElementIsNotNullWithNull() { + SchemaParsingUtils.ensureKeyElementIsNotNull(null); + } +}