-
Notifications
You must be signed in to change notification settings - Fork 441
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
Currently, when updating data in primary key tables, the changelog produces both UPDATE_BEFORE (-U) and UPDATE_AFTER (+U) records for each update operation. In many scenarios, downstream consumers do not require the previous value information, yet they still incur the cost of storing and transmitting both records.
This feature introduces a new table-level configuration option table.changelog.ignore-update-before that allows users to selectively ignore UPDATE_BEFORE records, providing the following benefits:
- Reduced storage costs: Approximately 50% reduction in changelog storage space for update-heavy workloads
- Lower transmission overhead: Decreased CDC data transfer costs between systems
- Improved write performance: For full-row update scenarios (using
DefaultRowMerger), the system can skip the RocksDB lookup for the old value entirely
Solution
1. New Configuration Option
Add a new configuration option in ConfigOptions:
public static final ConfigOption<Boolean> TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE =
key("table.changelog.ignore-update-before")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to ignore UPDATE_BEFORE records in changelog for the primary key table. "
+ "When disabled (default), update operations produce both UPDATE_BEFORE and UPDATE_AFTER records. "
+ "When enabled, update operations only produce UPDATE_AFTER records, "
+ "which reduces storage and transmission costs but loses the ability to track previous values. "
+ "This option only affects primary key tables.");Provide accessor method in TableConfig:
public boolean isChangelogIgnoreUpdateBefore() {
return config.get(ConfigOptions.TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE);
}2. KvTablet Core Logic
Modify the KvTablet.putAsLeader() method to handle the new configuration:
| Scenario | ignoreUpdateBefore=false (default) |
ignoreUpdateBefore=true |
|---|---|---|
| Insert | Produce INSERT (+I) |
Produce UPDATE_AFTER (+U)* |
| Full Update (DefaultRowMerger) | Query old value, produce -U, +U |
Skip old value query, produce +U only |
| Partial Update | Query old value, merge, produce -U, +U |
Query old value, merge, produce +U only |
| Delete | Produce DELETE (-D) |
Produce DELETE (-D) (unchanged) |
*Note: When using DefaultRowMerger with ignoreUpdateBefore=true, inserts are treated as updates for simplicity since the old value lookup is skipped.
3. Flink Source Adaptation
Update FlinkTableSource.getChangelogMode() to return the appropriate ChangelogMode based on the configuration:
- When
ignoreUpdateBefore=trueanddeleteBehavior=ALLOW:- Returns:
INSERT,UPDATE_AFTER,DELETE
- Returns:
- When
ignoreUpdateBefore=trueanddeleteBehavior!=ALLOW:- Returns:
INSERT,UPDATE_AFTER
- Returns:
- When
ignoreUpdateBefore=false(default):- Original behavior preserved
Anything else?
No response
Willingness to contribute
- I'm willing to submit a PR!