From 69fe1d87b2a4de828f6fdd934defebf2b9f73031 Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Sun, 7 Dec 2025 21:49:34 +0800 Subject: [PATCH 1/4] constrain upsert behavior for auto increment column --- .../client/table/writer/UpsertWriterImpl.java | 29 +++++++++++++++++-- .../fluss/client/table/FlussTableITCase.java | 22 ++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java index 904b85c2a4..431c091467 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java @@ -61,7 +61,11 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter { WriterClient writerClient) { super(tablePath, tableInfo, writerClient); RowType rowType = tableInfo.getRowType(); - sanityCheck(rowType, tableInfo.getPrimaryKeys(), partialUpdateColumns); + sanityCheck( + rowType, + tableInfo.getPrimaryKeys(), + tableInfo.getSchema().getAutoIncrementColumnNames(), + partialUpdateColumns); this.targetColumns = partialUpdateColumns; DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null); @@ -80,9 +84,20 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter { } private static void sanityCheck( - RowType rowType, List primaryKeys, @Nullable int[] targetColumns) { + RowType rowType, + List primaryKeys, + List autoIncrementColumnNames, + @Nullable int[] targetColumns) { // skip check when target columns is null if (targetColumns == null) { + if (!autoIncrementColumnNames.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "This table has auto increment column %s." + + "Explicitly specifying values for an auto increment column is not allowed." + + "Please specify non-auto-increment columns as target columns using partialUpdate first.", + autoIncrementColumnNames)); + } return; } BitSet targetColumnsSet = new BitSet(); @@ -116,6 +131,16 @@ private static void sanityCheck( } } } + + // explicitly specifying values for an auto increment column is not allowed + for (String autoIncrementColumnName : autoIncrementColumnNames) { + if (targetColumnsSet.get(rowType.getFieldIndex(autoIncrementColumnName))) { + throw new IllegalArgumentException( + String.format( + "Explicitly specifying values for the auto increment column %s is not allowed.", + autoIncrementColumnName)); + } + } } /** diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index cf20846756..8738134add 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -662,6 +662,28 @@ void testInvalidPartialUpdate() throws Exception { .hasMessage( "Invalid target column index: 3 for table test_db_1.test_pk_table_1. The table only has 3 columns."); } + + // test invalid auto increment column upsert + schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .primaryKey("a") + .enableAutoIncrement("b") + .build(); + tableDescriptor = TableDescriptor.builder().schema(schema).distributedBy(3, "a").build(); + TablePath tablePath = + TablePath.of("test_db_1", "test_invalid_auto_increment_column_upsert"); + createTable(tablePath, tableDescriptor, true); + try (Table table = conn.getTable(tablePath)) { + assertThatThrownBy(() -> table.newUpsert().createWriter()) + .hasMessage( + "This table has auto increment column [b]. Explicitly specifying values for an auto increment column is not allowed. Please specify non-auto-increment columns as target columns using partialUpdate first."); + + assertThatThrownBy(() -> table.newUpsert().partialUpdate("a", "b").createWriter()) + .hasMessage( + "Explicitly specifying values for the auto increment column b is not allowed."); + } } @Test From 5eba86f211418697f359658043c46c514f1a8397 Mon Sep 17 00:00:00 2001 From: Wang Cheng <348448708@qq.com> Date: Sun, 7 Dec 2025 22:02:45 +0800 Subject: [PATCH 2/4] nit --- .../apache/fluss/client/table/writer/UpsertWriterImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java index 431c091467..5c7254b15d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java @@ -93,8 +93,8 @@ private static void sanityCheck( if (!autoIncrementColumnNames.isEmpty()) { throw new IllegalArgumentException( String.format( - "This table has auto increment column %s." - + "Explicitly specifying values for an auto increment column is not allowed." + "This table has auto increment column %s. " + + "Explicitly specifying values for an auto increment column is not allowed. " + "Please specify non-auto-increment columns as target columns using partialUpdate first.", autoIncrementColumnNames)); } From 8a871da417a71019e95bff21be9271859b35c4de Mon Sep 17 00:00:00 2001 From: maxcwang Date: Mon, 8 Dec 2025 11:49:54 +0800 Subject: [PATCH 3/4] auto increment column should not be nullable --- .../client/table/writer/UpsertWriterImpl.java | 27 ++++++++++--------- .../fluss/client/table/FlussTableITCase.java | 9 ++++--- .../org/apache/fluss/metadata/Schema.java | 6 +++-- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java index 5c7254b15d..39f65592c1 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java @@ -118,10 +118,23 @@ private static void sanityCheck( pkColumnSet.set(pkIndex); } + BitSet autoIncrementColumnSet = new BitSet(); + // explicitly specifying values for an auto increment column is not allowed + for (String autoIncrementColumnName : autoIncrementColumnNames) { + int autoIncrementColumnIndex = rowType.getFieldIndex(autoIncrementColumnName); + if (targetColumnsSet.get(autoIncrementColumnIndex)) { + throw new IllegalArgumentException( + String.format( + "Explicitly specifying values for the auto increment column %s is not allowed.", + autoIncrementColumnName)); + } + autoIncrementColumnSet.set(autoIncrementColumnIndex); + } + // check the columns not in targetColumns should be nullable for (int i = 0; i < rowType.getFieldCount(); i++) { - // column not in primary key - if (!pkColumnSet.get(i)) { + // column not in primary key and not in auto increment column + if (!pkColumnSet.get(i) && !autoIncrementColumnSet.get(i)) { // the column should be nullable if (!rowType.getTypeAt(i).isNullable()) { throw new IllegalArgumentException( @@ -131,16 +144,6 @@ private static void sanityCheck( } } } - - // explicitly specifying values for an auto increment column is not allowed - for (String autoIncrementColumnName : autoIncrementColumnNames) { - if (targetColumnsSet.get(rowType.getFieldIndex(autoIncrementColumnName))) { - throw new IllegalArgumentException( - String.format( - "Explicitly specifying values for the auto increment column %s is not allowed.", - autoIncrementColumnName)); - } - } } /** diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index 8738134add..bfd9926df7 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -668,8 +668,9 @@ void testInvalidPartialUpdate() throws Exception { Schema.newBuilder() .column("a", DataTypes.INT()) .column("b", DataTypes.INT()) + .column("c", DataTypes.INT()) .primaryKey("a") - .enableAutoIncrement("b") + .enableAutoIncrement("c") .build(); tableDescriptor = TableDescriptor.builder().schema(schema).distributedBy(3, "a").build(); TablePath tablePath = @@ -678,11 +679,11 @@ void testInvalidPartialUpdate() throws Exception { try (Table table = conn.getTable(tablePath)) { assertThatThrownBy(() -> table.newUpsert().createWriter()) .hasMessage( - "This table has auto increment column [b]. Explicitly specifying values for an auto increment column is not allowed. Please specify non-auto-increment columns as target columns using partialUpdate first."); + "This table has auto increment column [c]. Explicitly specifying values for an auto increment column is not allowed. Please specify non-auto-increment columns as target columns using partialUpdate first."); - assertThatThrownBy(() -> table.newUpsert().partialUpdate("a", "b").createWriter()) + assertThatThrownBy(() -> table.newUpsert().partialUpdate("a", "c").createWriter()) .hasMessage( - "Explicitly specifying values for the auto increment column b is not allowed."); + "Explicitly specifying values for the auto increment column c is not allowed."); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index cf7734ace2..715fabf7c7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -607,8 +607,10 @@ private static List normalizeColumns( "The data type of auto increment column must be INT or BIGINT."); } - // primary key should not nullable - if (pkSet.contains(column.getName()) && column.getDataType().isNullable()) { + // primary key and auto increment column should not nullable + if ((pkSet.contains(column.getName()) + || autoIncrementColumnNames.contains(column.getName())) + && column.getDataType().isNullable()) { newColumns.add( new Column( column.getName(), From a0eb9a796958dcb72f9f53e99d28fc4a25585eae Mon Sep 17 00:00:00 2001 From: maxcwang Date: Mon, 8 Dec 2025 12:23:28 +0800 Subject: [PATCH 4/4] nit --- .../java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java index ee7008dbd2..33acb30838 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java @@ -87,7 +87,7 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase { "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c is third column\",\"id\":2}],\"highest_field_id\":2}"; static final String SCHEMA_JSON_4 = - "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}"; + "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}"; SchemaJsonSerdeTest() { super(SchemaJsonSerde.INSTANCE);