Skip to content

[Bigtable Sink] Add Extract Timestamp SMT#201

Draft
brandtnewton wants to merge 22 commits into
GoogleCloudPlatform:mainfrom
brandtnewton:kafka-sink/extract-timestamp-smt
Draft

[Bigtable Sink] Add Extract Timestamp SMT#201
brandtnewton wants to merge 22 commits into
GoogleCloudPlatform:mainfrom
brandtnewton:kafka-sink/extract-timestamp-smt

Conversation

@brandtnewton
Copy link
Copy Markdown
Collaborator

@brandtnewton brandtnewton commented May 14, 2026

Adds a new SMT for extracting a timestamp from a message value or key to use as the message timestamp

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new ExtractTimestamp Single Message Transformation (SMT) for extracting record timestamps from nested fields and refactors the KeyMapper to utilize a new SchemaParsingUtils utility. Review feedback highlights a syntax error in FlattenArrayElementTest, suggests renaming misleading variables for clarity, and recommends using DataException instead of IllegalArgumentException to align with Kafka Connect standards. Additionally, improvements were suggested for configuration descriptions and handling potential precision loss when parsing string-based timestamps.

I am having trouble creating individual review comments. Click here to see my feedback.

kafka-connect-bigtable-sink/sink/src/test/java/com/google/cloud/kafka/connect/bigtable/transformations/FlattenArrayElementTest.java (381)

high

A syntax error was introduced here (gi.toList()). This will cause a compilation failure.

        Arrays.stream(new Struct[] {null, productElement2, productElement3}).toList();

kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (49)

medium

The configuration description contains a 'TODO'. Please provide a meaningful description for the timestamp format configuration.

              "The format of the timestamp field (e.g., SECONDS, MILLIS, MICROS, NANOS).");

kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (52)

medium

The variable name elementWrapperFieldName is misleading as it stores an enum representing the timestamp format, not a field name. Consider renaming it to timestampFormat.

  private ExtractTimestampFormat timestampFormat;

kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (58)

medium

Parsing the enum directly from the config string using valueOf is case-sensitive and will throw an IllegalArgumentException if the value is invalid. It's better to use toUpperCase() for robustness and consider adding a validator to the ConfigDef.

    this.timestampFormat = ExtractTimestampFormat.valueOf(config.getString(TIMESTAMP_FORMAT_TYPE).toUpperCase(java.util.Locale.ROOT));

kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (64)

medium

Update usage of renamed variable timestampFormat.

    long parsedTimestampMillis = ExtractTimestamp.parseTimestampToMillis(timestampField, timestampFormat);

kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (90)

medium

In Kafka Connect transformations, DataException is typically preferred over IllegalArgumentException for errors encountered during record processing.

      throw new DataException("Cannot parse timestamp value of null");

kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (113)

medium

Casting the result of Double.parseDouble directly to long truncates the fractional part. If the input string represents seconds (e.g., "1715698738.123"), this results in a loss of precision (milliseconds) before the resolution logic is applied. Consider parsing as a double and multiplying by the appropriate factor (e.g., 1000 for seconds) before converting to long.

kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/transformations/ExtractTimestamp.java (116-117)

medium

Use DataException instead of IllegalArgumentException for consistency with Kafka Connect standards.

      throw new DataException(
          "Unsupported timestamp payload type: " + rawValue.getClass().getName());

kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/util/SchemaParsingUtils.java (59)

medium

This utility is now used by both KeyMapper and ExtractTimestamp. The error message 'Error with row key definition' is too specific and will be misleading when used in the context of timestamp extraction. Please use a more generic message.

      throw new DataException("Extracted field value cannot be null.");

@brandtnewton
Copy link
Copy Markdown
Collaborator Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the ExtractTimestamp SMT to enable extracting cell timestamps from Kafka message keys or values, supported by new utility classes ConfigUtils and SchemaParsingUtils. Existing components, including KeyMapper and BigtableSinkConfig, were refactored to use these utilities, and the README.md was updated with relevant documentation. Review feedback identifies a case-sensitivity bug in configuration parsing, precision loss when handling floating-point timestamp strings, and the need for improved variable naming and import practices. Minor documentation typos and grammar issues were also noted.

Comment thread kafka-connect-bigtable-sink/README.md Outdated
Comment thread kafka-connect-bigtable-sink/README.md Outdated
brandtnewton and others added 12 commits May 15, 2026 10:40
…d/kafka/connect/bigtable/transformations/ExtractTimestamp.java

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…d/kafka/connect/bigtable/transformations/ExtractTimestamp.java

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…d/kafka/connect/bigtable/transformations/ExtractTimestamp.java

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…d/kafka/connect/bigtable/transformations/ExtractTimestamp.java

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…d/kafka/connect/bigtable/util/SchemaParsingUtils.java

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…ewton/cloud-bigtable-ecosystem into kafka-sink/extract-timestamp-smt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant