Skip to content

Commit b90fd8a

Browse files
committed
[iceberg] support altering table properties
1 parent babf91b commit b90fd8a

File tree

2 files changed

+117
-2
lines changed

2 files changed

+117
-2
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.iceberg.PartitionSpec;
3333
import org.apache.iceberg.Schema;
3434
import org.apache.iceberg.SortOrder;
35+
import org.apache.iceberg.Table;
36+
import org.apache.iceberg.UpdateProperties;
3537
import org.apache.iceberg.catalog.Catalog;
3638
import org.apache.iceberg.catalog.Namespace;
3739
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -119,8 +121,27 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
119121
@Override
120122
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
121123
throws TableNotExistException {
122-
throw new UnsupportedOperationException(
123-
"Alter table is not supported for Iceberg at the moment");
124+
try {
125+
Table table = icebergCatalog.loadTable(toIcebergTableIdentifier(tablePath));
126+
UpdateProperties updateProperties = table.updateProperties();
127+
for (TableChange tableChange : tableChanges) {
128+
if (tableChange instanceof TableChange.SetOption) {
129+
TableChange.SetOption option = (TableChange.SetOption) tableChange;
130+
updateProperties.set(
131+
convertFlussPropertyKeyToIceberg(option.getKey()), option.getValue());
132+
} else if (tableChange instanceof TableChange.ResetOption) {
133+
TableChange.ResetOption option = (TableChange.ResetOption) tableChange;
134+
updateProperties.remove(convertFlussPropertyKeyToIceberg(option.getKey()));
135+
} else {
136+
throw new UnsupportedOperationException(
137+
"Unsupported table change: " + tableChange.getClass());
138+
}
139+
}
140+
141+
updateProperties.commit();
142+
} catch (TableNotExistException e) {
143+
throw new TableNotExistException("Table " + tablePath + " not exists.");
144+
}
124145
}
125146

126147
private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
@@ -249,6 +270,14 @@ private void setFlussPropertyToIceberg(
249270
}
250271
}
251272

273+
private static String convertFlussPropertyKeyToIceberg(String key) {
274+
if (key.startsWith(ICEBERG_CONF_PREFIX)) {
275+
return key.substring(ICEBERG_CONF_PREFIX.length());
276+
} else {
277+
return FLUSS_CONF_PREFIX + key;
278+
}
279+
}
280+
252281
private void createDatabase(String databaseName) {
253282
Namespace namespace = Namespace.of(databaseName);
254283
if (icebergCatalog instanceof SupportsNamespaces) {

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.exception.InvalidTableException;
2323
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
2424
import org.apache.fluss.metadata.Schema;
25+
import org.apache.fluss.metadata.TableChange;
2526
import org.apache.fluss.metadata.TableDescriptor;
2627
import org.apache.fluss.metadata.TablePath;
2728
import org.apache.fluss.types.DataTypes;
@@ -30,6 +31,7 @@
3031
import org.apache.iceberg.SortDirection;
3132
import org.apache.iceberg.SortField;
3233
import org.apache.iceberg.Table;
34+
import org.apache.iceberg.catalog.Catalog;
3335
import org.apache.iceberg.catalog.TableIdentifier;
3436
import org.apache.iceberg.types.Types;
3537
import org.assertj.core.api.Assertions;
@@ -455,4 +457,88 @@ void testIllegalPartitionKeyType(boolean isPrimaryKeyTable) throws Exception {
455457
.hasMessage(
456458
"Partition key only support string type for iceberg currently. Column `c1` is not string type.");
457459
}
460+
461+
@Test
462+
void alterTableProperties() {
463+
String database = "test_alter_table_db";
464+
String tableName = "test_alter_table";
465+
466+
Schema flussSchema = Schema.newBuilder().column("id", DataTypes.BIGINT()).build();
467+
468+
TableDescriptor tableDescriptor =
469+
TableDescriptor.builder()
470+
.schema(flussSchema)
471+
.distributedBy(3)
472+
.property("iceberg.commit.retry.num-retries", "5")
473+
.property("table.datalake.freshness", "30s")
474+
.build();
475+
476+
TablePath tablePath = TablePath.of(database, tableName);
477+
TestingLakeCatalogContext context = new TestingLakeCatalogContext();
478+
flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
479+
480+
Catalog catalog = flussIcebergCatalog.getIcebergCatalog();
481+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
482+
.containsEntry("commit.retry.num-retries", "5")
483+
.containsEntry("fluss.table.datalake.freshness", "30s")
484+
.doesNotContainKeys("iceberg.commit.retry.num-retries", "table.datalake.freshness");
485+
486+
// set new iceberg property
487+
flussIcebergCatalog.alterTable(
488+
tablePath,
489+
List.of(TableChange.set("iceberg.commit.retry.min-wait-ms", "1000")),
490+
context);
491+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
492+
.containsEntry("commit.retry.min-wait-ms", "1000")
493+
.containsEntry("commit.retry.num-retries", "5")
494+
.containsEntry("fluss.table.datalake.freshness", "30s")
495+
.doesNotContainKeys(
496+
"iceberg.commit.retry.min-wait-ms",
497+
"iceberg.commit.retry.num-retries",
498+
"table.datalake.freshness");
499+
500+
// update existing properties
501+
flussIcebergCatalog.alterTable(
502+
tablePath,
503+
List.of(
504+
TableChange.set("iceberg.commit.retry.num-retries", "10"),
505+
TableChange.set("table.datalake.freshness", "23s")),
506+
context);
507+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
508+
.containsEntry("commit.retry.min-wait-ms", "1000")
509+
.containsEntry("commit.retry.num-retries", "10")
510+
.containsEntry("fluss.table.datalake.freshness", "23s")
511+
.doesNotContainKeys(
512+
"iceberg.commit.retry.min-wait-ms",
513+
"iceberg.commit.retry.num-retries",
514+
"table.datalake.freshness");
515+
516+
// remove existing properties
517+
flussIcebergCatalog.alterTable(
518+
tablePath,
519+
List.of(
520+
TableChange.reset("iceberg.commit.retry.min-wait-ms"),
521+
TableChange.reset("table.datalake.freshness")),
522+
context);
523+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
524+
.containsEntry("commit.retry.num-retries", "10")
525+
.doesNotContainKeys(
526+
"commit.retry.min-wait-ms",
527+
"iceberg.commit.retry.min-wait-ms",
528+
"table.datalake.freshness",
529+
"fluss.table.datalake.freshness");
530+
531+
// remove non-existing property
532+
flussIcebergCatalog.alterTable(
533+
tablePath, List.of(TableChange.reset("iceberg.non-existing.property")), context);
534+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
535+
.containsEntry("commit.retry.num-retries", "10")
536+
.doesNotContainKeys(
537+
"non-existing.property",
538+
"iceberg.non-existing.property",
539+
"commit.retry.min-wait-ms",
540+
"iceberg.commit.retry.min-wait-ms",
541+
"table.datalake.freshness",
542+
"fluss.table.datalake.freshness");
543+
}
458544
}

0 commit comments

Comments
 (0)