Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
WriterClient writerClient) {
super(tablePath, tableInfo, writerClient);
RowType rowType = tableInfo.getRowType();
sanityCheck(rowType, tableInfo.getPrimaryKeys(), partialUpdateColumns);
sanityCheck(
rowType,
tableInfo.getPrimaryKeys(),
tableInfo.getSchema().getAutoIncrementColumnNames(),
partialUpdateColumns);

this.targetColumns = partialUpdateColumns;
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
Expand All @@ -80,9 +84,20 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
}

private static void sanityCheck(
RowType rowType, List<String> primaryKeys, @Nullable int[] targetColumns) {
RowType rowType,
List<String> primaryKeys,
List<String> autoIncrementColumnNames,
@Nullable int[] targetColumns) {
// skip check when target columns is null
if (targetColumns == null) {
if (!autoIncrementColumnNames.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"This table has auto increment column %s. "
+ "Explicitly specifying values for an auto increment column is not allowed. "
+ "Please specify non-auto-increment columns as target columns using partialUpdate first.",
autoIncrementColumnNames));
}
return;
}
BitSet targetColumnsSet = new BitSet();
Expand All @@ -103,10 +118,23 @@ private static void sanityCheck(
pkColumnSet.set(pkIndex);
}

BitSet autoIncrementColumnSet = new BitSet();
// explicitly specifying values for an auto increment column is not allowed
for (String autoIncrementColumnName : autoIncrementColumnNames) {
int autoIncrementColumnIndex = rowType.getFieldIndex(autoIncrementColumnName);
if (targetColumnsSet.get(autoIncrementColumnIndex)) {
throw new IllegalArgumentException(
String.format(
"Explicitly specifying values for the auto increment column %s is not allowed.",
autoIncrementColumnName));
}
autoIncrementColumnSet.set(autoIncrementColumnIndex);
}

// check the columns not in targetColumns should be nullable
for (int i = 0; i < rowType.getFieldCount(); i++) {
// column not in primary key
if (!pkColumnSet.get(i)) {
// column not in primary key and not in auto increment column
if (!pkColumnSet.get(i) && !autoIncrementColumnSet.get(i)) {
// the column should be nullable
if (!rowType.getTypeAt(i).isNullable()) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,29 @@ void testInvalidPartialUpdate() throws Exception {
.hasMessage(
"Invalid target column index: 3 for table test_db_1.test_pk_table_1. The table only has 3 columns.");
}

// test invalid auto increment column upsert
schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.column("c", DataTypes.INT())
.primaryKey("a")
.enableAutoIncrement("c")
.build();
tableDescriptor = TableDescriptor.builder().schema(schema).distributedBy(3, "a").build();
TablePath tablePath =
TablePath.of("test_db_1", "test_invalid_auto_increment_column_upsert");
createTable(tablePath, tableDescriptor, true);
try (Table table = conn.getTable(tablePath)) {
assertThatThrownBy(() -> table.newUpsert().createWriter())
.hasMessage(
"This table has auto increment column [c]. Explicitly specifying values for an auto increment column is not allowed. Please specify non-auto-increment columns as target columns using partialUpdate first.");

assertThatThrownBy(() -> table.newUpsert().partialUpdate("a", "c").createWriter())
.hasMessage(
"Explicitly specifying values for the auto increment column c is not allowed.");
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,10 @@ private static List<Column> normalizeColumns(
"The data type of auto increment column must be INT or BIGINT.");
}

// primary key should not nullable
if (pkSet.contains(column.getName()) && column.getDataType().isNullable()) {
// primary key and auto increment column should not nullable
if ((pkSet.contains(column.getName())
|| autoIncrementColumnNames.contains(column.getName()))
&& column.getDataType().isNullable()) {
newColumns.add(
new Column(
column.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase<Schema> {
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c is third column\",\"id\":2}],\"highest_field_id\":2}";

static final String SCHEMA_JSON_4 =
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";

SchemaJsonSerdeTest() {
super(SchemaJsonSerde.INSTANCE);
Expand Down