diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index d07c8b90e494..1d1505a28c05 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -99,7 +99,6 @@ public class FlinkCatalog extends AbstractCatalog { private final Namespace baseNamespace; private final SupportsNamespaces asNamespaceCatalog; private final Closeable closeable; - private final Map catalogProps; private final boolean cacheEnabled; public FlinkCatalog( @@ -107,12 +106,10 @@ public FlinkCatalog( String defaultDatabase, Namespace baseNamespace, CatalogLoader catalogLoader, - Map catalogProps, boolean cacheEnabled, long cacheExpirationIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; - this.catalogProps = catalogProps; this.baseNamespace = baseNamespace; this.cacheEnabled = cacheEnabled; @@ -339,13 +336,12 @@ public CatalogTable getTable(ObjectPath tablePath) Table table = loadIcebergTable(tablePath); // Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table. - // Inorder to create such table in non iceberg catalog, we need to send across catalog - // properties also. // As Flink API accepts only Map for props, here we are serializing catalog - // props as json string to distinguish between catalog and table properties in createTable. + // name, database, table as json string to distinguish between catalog info + // and table properties in createTable. String srcCatalogProps = FlinkCreateTableOptions.toJson( - getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps); + getName(), tablePath.getDatabaseName(), tablePath.getObjectName()); Map tableProps = table.properties(); if (tableProps.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index dd065617bd88..fe4008a13ce5 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -168,7 +168,6 @@ protected Catalog createCatalog( defaultDatabase, baseNamespace, catalogLoader, - properties, cacheEnabled, cacheExpirationIntervalMs); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java index 0612260bfe7d..067b42bba954 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.flink; -import java.util.Map; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.iceberg.util.JsonUtil; @@ -27,14 +26,11 @@ class FlinkCreateTableOptions { private final String catalogName; private final String catalogDb; private final String catalogTable; - private final Map catalogProps; - private FlinkCreateTableOptions( - String catalogName, String catalogDb, String catalogTable, Map props) { + private FlinkCreateTableOptions(String catalogName, String catalogDb, String catalogTable) { this.catalogName = catalogName; this.catalogDb = catalogDb; this.catalogTable = catalogTable; - this.catalogProps = props; } public static final ConfigOption CATALOG_NAME = @@ -61,12 +57,6 @@ private FlinkCreateTableOptions( .noDefaultValue() .withDescription("Table name managed in the underlying iceberg catalog and database."); - public static final ConfigOption> CATALOG_PROPS = - ConfigOptions.key("catalog-props") - .mapType() - .noDefaultValue() - .withDescription("Properties for the underlying catalog for iceberg table."); - public static final ConfigOption USE_DYNAMIC_ICEBERG_SINK = ConfigOptions.key("use-dynamic-iceberg-sink") .booleanType() @@ -89,15 +79,13 @@ private FlinkCreateTableOptions( public static final String CONNECTOR_PROPS_KEY = "connector"; public static final String LOCATION_KEY = "location"; - static String toJson( - String catalogName, String catalogDb, String catalogTable, Map catalogProps) { + static String toJson(String catalogName, String catalogDb, String catalogTable) { return JsonUtil.generate( gen -> { gen.writeStartObject(); gen.writeStringField(CATALOG_NAME.key(), catalogName); gen.writeStringField(CATALOG_DATABASE.key(), catalogDb); gen.writeStringField(CATALOG_TABLE.key(), catalogTable); - JsonUtil.writeStringMap(CATALOG_PROPS.key(), catalogProps, gen); gen.writeEndObject(); }, false); @@ -110,9 +98,8 @@ static FlinkCreateTableOptions fromJson(String createTableOptions) { String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node); String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node); String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node); - Map catalogProps = JsonUtil.getStringMap(CATALOG_PROPS.key(), node); - return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable, catalogProps); + return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable); }); } @@ -127,8 +114,4 @@ String catalogDb() { String catalogTable() { return catalogTable; } - - Map catalogProps() { - return catalogProps; - } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index b2e2e33b9291..6306ee7a0a1c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -237,13 +237,13 @@ private static TableLoader createTableLoader( } /** - * Merges source catalog properties with connector properties. Iceberg Catalog properties are - * serialized as json in FlinkCatalog#getTable to be able to isolate catalog props from iceberg - * table props, Here, we flatten and merge them back to use to create catalog. + * Merges source catalog properties (catalog name, database, table) with connector properties. + * Source catalog name, database, table are serialized as json in FlinkCatalog#getTable to be able + * to isolate them from iceberg table props, Here, we flatten and merge them back. * * @param tableProps the existing table properties - * @return a map of merged properties, with source catalog properties taking precedence when keys - * conflict + * @return a map of merged properties, defaulting to source catalog name, database and table + * unless overridden. */ private static Map mergeSrcCatalogProps(Map tableProps) { String srcCatalogProps = tableProps.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY); @@ -257,7 +257,6 @@ private static Map mergeSrcCatalogProps(Map tabl FlinkCreateTableOptions.CATALOG_DATABASE.key(), createTableOptions.catalogDb()); mergedProps.put( FlinkCreateTableOptions.CATALOG_TABLE.key(), createTableOptions.catalogTable()); - mergedProps.putAll(createTableOptions.catalogProps()); tableProps.forEach( (k, v) -> { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java index 062ff68d5d85..e45abe25f919 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java @@ -89,6 +89,7 @@ public void before() { config.put(CatalogProperties.URI, getURI(hiveConf)); } config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot())); + config.put("extra-catalog-prop", "extra-value"); this.flinkDatabase = catalogName + "." + DATABASE; this.icebergNamespace = diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index c3bf9818ccdd..b4a8965e9bd4 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -230,13 +230,15 @@ public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { .column("id", DataTypes.BIGINT()) .build()); - String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", config); + String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl"); Map options = catalogTable.getOptions(); assertThat(options) .containsEntry( FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, FlinkDynamicTableFactory.FACTORY_IDENTIFIER) .containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps); + assertThat(options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)) + .doesNotContain("extra-catalog-prop", "extra-value"); } @TestTemplate diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index 0cdaf8371cbd..6cfc18868af0 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -179,7 +179,10 @@ public void testReadFlinkDynamicTable() throws Exception { List expected = generateExpectedRecords(false); SqlHelpers.sql( getTableEnv(), - "create table `default_catalog`.`default_database`.flink_table LIKE iceberg_catalog.`default`.%s", + "create table `default_catalog`.`default_database`.flink_table " + + "WITH ('catalog-type'='hadoop', 'warehouse'='%s') " + + "LIKE iceberg_catalog.`default`.%s", + CATALOG_EXTENSION.warehouse(), TestFixtures.TABLE); // Read from table in flink catalog @@ -199,8 +202,11 @@ public void testWatermarkInvalidConfig() { getStreamingTableEnv(), "CREATE TABLE %s " + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " - + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE iceberg_catalog.`default`.%s", + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) " + + "WITH ('catalog-type'='hadoop', 'warehouse'='%s') " + + "LIKE iceberg_catalog.`default`.%s", flinkTable, + CATALOG_EXTENSION.warehouse(), TestFixtures.TABLE); assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable)) @@ -218,8 +224,11 @@ public void testWatermarkValidConfig() throws Exception { getStreamingTableEnv(), "CREATE TABLE %s " + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " - + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH ('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s", + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) " + + "WITH ('catalog-type'='hadoop', 'warehouse'='%s', 'watermark-column'='t1') " + + "LIKE iceberg_catalog.`default`.%s", flinkTable, + CATALOG_EXTENSION.warehouse(), TestFixtures.TABLE); TestHelpers.assertRecordsWithOrder( diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index d07c8b90e494..1d1505a28c05 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -99,7 +99,6 @@ public class FlinkCatalog extends AbstractCatalog { private final Namespace baseNamespace; private final SupportsNamespaces asNamespaceCatalog; private final Closeable closeable; - private final Map catalogProps; private final boolean cacheEnabled; public FlinkCatalog( @@ -107,12 +106,10 @@ public FlinkCatalog( String defaultDatabase, Namespace baseNamespace, CatalogLoader catalogLoader, - Map catalogProps, boolean cacheEnabled, long cacheExpirationIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; - this.catalogProps = catalogProps; this.baseNamespace = baseNamespace; this.cacheEnabled = cacheEnabled; @@ -339,13 +336,12 @@ public CatalogTable getTable(ObjectPath tablePath) Table table = loadIcebergTable(tablePath); // Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table. - // Inorder to create such table in non iceberg catalog, we need to send across catalog - // properties also. // As Flink API accepts only Map for props, here we are serializing catalog - // props as json string to distinguish between catalog and table properties in createTable. + // name, database, table as json string to distinguish between catalog info + // and table properties in createTable. String srcCatalogProps = FlinkCreateTableOptions.toJson( - getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps); + getName(), tablePath.getDatabaseName(), tablePath.getObjectName()); Map tableProps = table.properties(); if (tableProps.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index 33cbc92ddeec..c1889dc6bd03 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -170,7 +170,6 @@ protected Catalog createCatalog( defaultDatabase, baseNamespace, catalogLoader, - properties, cacheEnabled, cacheExpirationIntervalMs); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java index 0612260bfe7d..067b42bba954 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCreateTableOptions.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.flink; -import java.util.Map; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.iceberg.util.JsonUtil; @@ -27,14 +26,11 @@ class FlinkCreateTableOptions { private final String catalogName; private final String catalogDb; private final String catalogTable; - private final Map catalogProps; - private FlinkCreateTableOptions( - String catalogName, String catalogDb, String catalogTable, Map props) { + private FlinkCreateTableOptions(String catalogName, String catalogDb, String catalogTable) { this.catalogName = catalogName; this.catalogDb = catalogDb; this.catalogTable = catalogTable; - this.catalogProps = props; } public static final ConfigOption CATALOG_NAME = @@ -61,12 +57,6 @@ private FlinkCreateTableOptions( .noDefaultValue() .withDescription("Table name managed in the underlying iceberg catalog and database."); - public static final ConfigOption> CATALOG_PROPS = - ConfigOptions.key("catalog-props") - .mapType() - .noDefaultValue() - .withDescription("Properties for the underlying catalog for iceberg table."); - public static final ConfigOption USE_DYNAMIC_ICEBERG_SINK = ConfigOptions.key("use-dynamic-iceberg-sink") .booleanType() @@ -89,15 +79,13 @@ private FlinkCreateTableOptions( public static final String CONNECTOR_PROPS_KEY = "connector"; public static final String LOCATION_KEY = "location"; - static String toJson( - String catalogName, String catalogDb, String catalogTable, Map catalogProps) { + static String toJson(String catalogName, String catalogDb, String catalogTable) { return JsonUtil.generate( gen -> { gen.writeStartObject(); gen.writeStringField(CATALOG_NAME.key(), catalogName); gen.writeStringField(CATALOG_DATABASE.key(), catalogDb); gen.writeStringField(CATALOG_TABLE.key(), catalogTable); - JsonUtil.writeStringMap(CATALOG_PROPS.key(), catalogProps, gen); gen.writeEndObject(); }, false); @@ -110,9 +98,8 @@ static FlinkCreateTableOptions fromJson(String createTableOptions) { String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node); String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node); String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node); - Map catalogProps = JsonUtil.getStringMap(CATALOG_PROPS.key(), node); - return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable, catalogProps); + return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable); }); } @@ -127,8 +114,4 @@ String catalogDb() { String catalogTable() { return catalogTable; } - - Map catalogProps() { - return catalogProps; - } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java index b2e2e33b9291..6306ee7a0a1c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java @@ -237,13 +237,13 @@ private static TableLoader createTableLoader( } /** - * Merges source catalog properties with connector properties. Iceberg Catalog properties are - * serialized as json in FlinkCatalog#getTable to be able to isolate catalog props from iceberg - * table props, Here, we flatten and merge them back to use to create catalog. + * Merges source catalog properties (catalog name, database, table) with connector properties. + * Source catalog name, database, table are serialized as json in FlinkCatalog#getTable to be able + * to isolate them from iceberg table props, Here, we flatten and merge them back. * * @param tableProps the existing table properties - * @return a map of merged properties, with source catalog properties taking precedence when keys - * conflict + * @return a map of merged properties, defaulting to source catalog name, database and table + * unless overridden. */ private static Map mergeSrcCatalogProps(Map tableProps) { String srcCatalogProps = tableProps.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY); @@ -257,7 +257,6 @@ private static Map mergeSrcCatalogProps(Map tabl FlinkCreateTableOptions.CATALOG_DATABASE.key(), createTableOptions.catalogDb()); mergedProps.put( FlinkCreateTableOptions.CATALOG_TABLE.key(), createTableOptions.catalogTable()); - mergedProps.putAll(createTableOptions.catalogProps()); tableProps.forEach( (k, v) -> { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java index 062ff68d5d85..e45abe25f919 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java @@ -89,6 +89,7 @@ public void before() { config.put(CatalogProperties.URI, getURI(hiveConf)); } config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot())); + config.put("extra-catalog-prop", "extra-value"); this.flinkDatabase = catalogName + "." + DATABASE; this.icebergNamespace = diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 4dd1d6e40be3..30daf6d55ef2 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -35,7 +35,6 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; @@ -231,19 +230,15 @@ public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { .column("id", DataTypes.BIGINT()) .build()); - // `type` option is filtered out by Flink - // https://github.com/apache/flink/blob/edc3d68736de73665440f4313ddcfd9142d8d42b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java#L378 - Map filteredOptions = Maps.newHashMap(config); - filteredOptions.remove(CommonCatalogOptions.CATALOG_TYPE.key()); - - String srcCatalogProps = - FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", filteredOptions); + String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl"); Map options = catalogTable.getOptions(); assertThat(options) .containsEntry( FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, FlinkDynamicTableFactory.FACTORY_IDENTIFIER) .containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps); + assertThat(options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)) + .doesNotContain("extra-catalog-prop", "extra-value"); } @TestTemplate diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index 0cdaf8371cbd..6cfc18868af0 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -179,7 +179,10 @@ public void testReadFlinkDynamicTable() throws Exception { List expected = generateExpectedRecords(false); SqlHelpers.sql( getTableEnv(), - "create table `default_catalog`.`default_database`.flink_table LIKE iceberg_catalog.`default`.%s", + "create table `default_catalog`.`default_database`.flink_table " + + "WITH ('catalog-type'='hadoop', 'warehouse'='%s') " + + "LIKE iceberg_catalog.`default`.%s", + CATALOG_EXTENSION.warehouse(), TestFixtures.TABLE); // Read from table in flink catalog @@ -199,8 +202,11 @@ public void testWatermarkInvalidConfig() { getStreamingTableEnv(), "CREATE TABLE %s " + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " - + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE iceberg_catalog.`default`.%s", + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) " + + "WITH ('catalog-type'='hadoop', 'warehouse'='%s') " + + "LIKE iceberg_catalog.`default`.%s", flinkTable, + CATALOG_EXTENSION.warehouse(), TestFixtures.TABLE); assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable)) @@ -218,8 +224,11 @@ public void testWatermarkValidConfig() throws Exception { getStreamingTableEnv(), "CREATE TABLE %s " + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " - + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH ('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s", + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) " + + "WITH ('catalog-type'='hadoop', 'warehouse'='%s', 'watermark-column'='t1') " + + "LIKE iceberg_catalog.`default`.%s", flinkTable, + CATALOG_EXTENSION.warehouse(), TestFixtures.TABLE); TestHelpers.assertRecordsWithOrder(