Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions kafka-connect-bigtable-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type:

#### Using Message Values for Row Keys

If you need to use fields from the message value, rather than the message key, use the `org.apache.kafka.connect.transforms.ValueToKey` SMT to map the value onto the key:
If you need to use fields from the message value, rather than the message key, use
the `org.apache.kafka.connect.transforms.ValueToKey` SMT to map the value onto the key:

```properties
transforms=createKey
Expand Down Expand Up @@ -66,11 +67,17 @@ optimizing configs for latency will reduce throughput and efficiency.
When `value.null.mode` is set to `delete`, Kafka messages with a null value will
result in the corresponding row being deleted.

### Cell Timestamps

The Bigtable cell timestamps for your data are set to the timestamp of the Kafka message. If the message timestamp is
null, the Sink system time is used. If you need a message value to be used for the cell timestamp, try using the Extract
Timestamp SMT that's included in this package.

## SMT

This project includes SMTs that may be useful for preparing your data for Bigtable.

### Flatten Array Element
### Flatten Array Element SMT

This SMT is used to flatten nested array fields that can be generated by some serialization libraries.

Expand Down Expand Up @@ -105,7 +112,8 @@ The name of the field wrapping individual elements within the array.

#### Example

Given the following input message value, with `array.field="products"` `array.inner.wrapper="list"` `array.element.wrapper="element"`
Given the following input message value,
with `array.field="products"` `array.inner.wrapper="list"` `array.element.wrapper="element"`

```json
{
Expand Down Expand Up @@ -137,6 +145,37 @@ The resulting output would be:
}
```

### Extract Timestamp SMT

Extracts a timestamp from the message, to be used as the message timestamp. This can be useful because this Sink
Connector uses the Kafka message timestamp for the Bigtable cell timestamp.

#### Configuration

`timestamp.field`

The name of the timestamp field. Non-root fields can be referenced by specifying the field path, with periods
separating each field. If the field cannot be found, or if the value is null, the message is failed. The field may be
a numeric, string or date type.

`timestamp.field.precision`

The precision of the timestamp field. Defaults to MILLIS. This only affects the output for numeric fields. Ignore this
config if your field is a date type. Supported values are NANOS, MICROS, MILLIS and SECONDS. Use the value that matches
the field's precision. Example: if your field has epoch millisecond values, use the MILLIS config value.

##### Example

This configuration extracts the message timestamp from a createdAt (int64) epoch millisecond field inside the "order"
struct.

```properties
transforms=extractTimestamp
transforms.extractTimestamp.type=org.apache.kafka.connect.transformations.ExtractTimestamp
transforms.extractTimestamp.field=order.createdAt
transforms.extractTimestamp.precision=MILLI"
```

## Configuration

See [config/](./config/bigtable-kafka-sink-connector.properties) for a sample
Expand Down Expand Up @@ -204,7 +243,7 @@ Defines the insertion mode to use. Supported modes are:
the table, an error is thrown.
- upsert - If the row to be written already exists, then its column values are
overwritten with the ones provided.
- replace_if_newest - If there are no cells newer than this record within the
- replace_if_newest - If there are no cells newer than this record within the
target row of the table, clear the row and then insert new record.

* Type: string
Expand Down Expand Up @@ -313,10 +352,10 @@ together directly.

`expand.root.level.arrays`

Determines whether root level arrays should be expanded to a column family
or serialized to a single column. If true, root level array fields will
be mapped to a Bigtable column family where each element is stored in
an individual column with its index as column qualifier, padded with
Determines whether root level arrays should be expanded to a column family
or serialized to a single column. If true, root level array fields will
be mapped to a Bigtable column family where each element is stored in
an individual column with its index as column qualifier, padded with
zeros to a length of 6. If false, root level array fields will be
serialized as a JSON string to a single column.

Expand Down Expand Up @@ -389,7 +428,8 @@ The following steps are to run in standalone mode. They can be easily adapted
for running Kafka Connect in distributed mode using
the [REST API](http://kafka.apache.org/documentation.html#connect_running).

1. Download the latest `kafka-connect-bigtable-sink.jar` file from the [Releases page](https://github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/releases).
1. Download the latest `kafka-connect-bigtable-sink.jar` file from
the [Releases page](https://github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/releases).
2. Copy the `kafka-connect-bigtable-sink.jar` JAR file into your
**plugin directory**. The plugin directory can be any of the directories
specified by
Expand Down
2 changes: 1 addition & 1 deletion kafka-connect-bigtable-sink/doc/tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,5 @@ mvn -Pintegration-tests clean verify
#### To run a specific integration test

```bash
mvn -Pintegration-tests clean verify -Dit.test=InsertUpsertIT#testUpsert
mvn -Pintegration-tests clean verify -Dit.test=InsertModeIT#testUpsert
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.kafka.connect.bigtable.integration;

import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.DEFAULT_COLUMN_FAMILY_CONFIG;
import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.INSERT_MODE_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.cloud.kafka.connect.bigtable.config.InsertMode;
import com.google.cloud.kafka.connect.bigtable.transformations.ExtractTimestamp;
import com.google.cloud.kafka.connect.bigtable.transformations.TimestampPrecision;
import com.google.cloud.kafka.connect.bigtable.util.TestDataUtil;
import com.google.protobuf.ByteString;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ExtractTimestampIT extends BaseKafkaConnectBigtableIT {

@Test
public void testExtractTimestampFromValue() throws InterruptedException, ExecutionException {
Map<String, String> props = baseConnectorProps();
props.put(INSERT_MODE_CONFIG, InsertMode.UPSERT.name());
props.put("transforms", "extractTimestamp");
props.put("transforms.extractTimestamp.type", ExtractTimestamp.Value.class.getName());
props.put("transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "ts");
props.put(
"transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG,
TimestampPrecision.MILLIS.name());
props.put(DEFAULT_COLUMN_FAMILY_CONFIG, "cf");
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());

String testId = startSingleTopicConnector(props);
createTablesAndColumnFamilies(Map.of(testId, Set.of("cf")));

Schema schema =
SchemaBuilder.struct()
.field("id", Schema.STRING_SCHEMA)
.field("ts", Schema.INT64_SCHEMA)
.build();

long timestamp = 1234567890L;
Struct value = new Struct(schema).put("id", "val1").put("ts", timestamp);

JsonConverter converter = new JsonConverter();
converter.configure(Collections.singletonMap("schemas.enable", "true"), false);
byte[] valueJson = converter.fromConnectData(testId, schema, value);

String key = "key1";
connect.kafka().produce(testId, key, new String(valueJson));

waitUntilBigtableContainsNumberOfRows(testId, 1);
Map<ByteString, Row> rows = readAllRows(bigtableData, testId);
Row row = rows.get(ByteString.copyFrom(key.getBytes(StandardCharsets.UTF_8)));
assertNotNull(row);

List<RowCell> cells = row.getCells("cf", "id");
assertEquals(1, cells.size());
// Bigtable timestamps are in microseconds
assertEquals(timestamp * 1000, cells.get(0).getTimestamp());
}

@Test
public void testExtractTimestampFromNestedValue()
throws InterruptedException, ExecutionException {
Map<String, String> props = baseConnectorProps();
props.put(INSERT_MODE_CONFIG, InsertMode.UPSERT.name());
props.put("transforms", "extractTimestamp");
props.put("transforms.extractTimestamp.type", ExtractTimestamp.Value.class.getName());
props.put(
"transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "nested.ts");
props.put(
"transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG,
TimestampPrecision.SECONDS.name());
props.put(DEFAULT_COLUMN_FAMILY_CONFIG, "cf");
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());

String testId = startSingleTopicConnector(props);
createTablesAndColumnFamilies(Map.of(testId, Set.of("cf", "nested")));

Schema nestedSchema = SchemaBuilder.struct().field("ts", Schema.INT64_SCHEMA).build();
Schema schema =
SchemaBuilder.struct()
.field("id", Schema.STRING_SCHEMA)
.field("nested", nestedSchema)
.build();

long timestampSeconds = 1600000000L;
Struct nestedValue = new Struct(nestedSchema).put("ts", timestampSeconds);
Struct value = new Struct(schema).put("id", "val2").put("nested", nestedValue);

JsonConverter converter = new JsonConverter();
converter.configure(Collections.singletonMap("schemas.enable", "true"), false);
byte[] valueJson = converter.fromConnectData(testId, schema, value);

String key = "key2";
connect.kafka().produce(testId, key, new String(valueJson));

waitUntilBigtableContainsNumberOfRows(testId, 1);
Map<ByteString, Row> rows = readAllRows(bigtableData, testId);
Row row = rows.get(ByteString.copyFrom(key.getBytes(StandardCharsets.UTF_8)));
assertNotNull(row);

List<RowCell> cells = row.getCells("cf", "id");
assertEquals(1, cells.size());
// Bigtable timestamps are in microseconds.
// timestampSeconds * 1000 (millis) * 1000 (micros)
assertEquals(timestampSeconds * 1000 * 1000, cells.get(0).getTimestamp());
}

@Test
public void testExtractTimestampFromKey() throws InterruptedException, ExecutionException {
Map<String, String> props = baseConnectorProps();
props.put(INSERT_MODE_CONFIG, InsertMode.UPSERT.name());
props.put("transforms", "extractTimestamp");
props.put("transforms.extractTimestamp.type", ExtractTimestamp.Key.class.getName());
props.put("transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "ts");
props.put(DEFAULT_COLUMN_FAMILY_CONFIG, "cf");
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put("key.converter", JsonConverter.class.getName());
props.put("key.converter.schemas.enable", "true");

String testId = startSingleTopicConnector(props);
createTablesAndColumnFamilies(Map.of(testId, Set.of("cf")));

Schema keySchema = SchemaBuilder.struct().field("ts", Schema.INT64_SCHEMA).build();
long timestamp = 9876543210L;
Struct keyStruct = new Struct(keySchema).put("ts", timestamp);

JsonConverter converter = new JsonConverter();
converter.configure(Collections.singletonMap("schemas.enable", "true"), true);
byte[] keyJson = converter.fromConnectData(testId, keySchema, keyStruct);

String value = "some-value";
connect.kafka().produce(testId, new String(keyJson), value);

waitUntilBigtableContainsNumberOfRows(testId, 1);
Map<ByteString, Row> rows = readAllRows(bigtableData, testId);
// The row key will be the JSON string of the key
Row row = rows.values().iterator().next();
assertNotNull(row);

List<RowCell> cells = row.getCells("cf", "KAFKA_VALUE");
assertEquals(1, cells.size());
assertEquals(timestamp * 1000, cells.get(0).getTimestamp());
}

@Test
public void testExtractTimestampFromValueNoSchema() throws Exception {
Map<String, String> props = baseConnectorProps();
props.put(INSERT_MODE_CONFIG, InsertMode.UPSERT.name());
props.put("transforms", "extractTimestamp");
props.put("transforms.extractTimestamp.type", ExtractTimestamp.Value.class.getName());
props.put(
"transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_CONFIG, "createdAt");
props.put(
"transforms.extractTimestamp." + ExtractTimestamp.TIMESTAMP_FIELD_PRECISION_CONFIG,
TimestampPrecision.SECONDS.name());
props.put(DEFAULT_COLUMN_FAMILY_CONFIG, "cf");
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
props.put("value.converter.schemas.enable", "false");

String testId = startSingleTopicConnector(props);
createTablesAndColumnFamilies(Map.of(testId, Set.of("cf")));

String json = TestDataUtil.readResource("json/order-no-schema.json");
String key = "order1";
connect.kafka().produce(testId, key, json);

waitUntilBigtableContainsNumberOfRows(testId, 1);
Map<ByteString, Row> rows = readAllRows(bigtableData, testId);
Row row = rows.get(ByteString.copyFrom(key.getBytes(StandardCharsets.UTF_8)));
assertNotNull(row);

// createdAt: 1779122855 (seconds)
long expectedTimestampMicros = 1779122855L * 1000 * 1000;

List<RowCell> cells = row.getCells("cf", "KAFKA_VALUE");
assertEquals(1, cells.size());
assertEquals(expectedTimestampMicros, cells.get(0).getTimestamp());
}
}
Loading
Loading