Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,17 @@ public class FlinkCatalog extends AbstractCatalog {
private final Namespace baseNamespace;
private final SupportsNamespaces asNamespaceCatalog;
private final Closeable closeable;
private final Map<String, String> catalogProps;
private final boolean cacheEnabled;

public FlinkCatalog(
String catalogName,
String defaultDatabase,
Namespace baseNamespace,
CatalogLoader catalogLoader,
Map<String, String> catalogProps,
boolean cacheEnabled,
long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Expand Down Expand Up @@ -339,13 +336,12 @@ public CatalogTable getTable(ObjectPath tablePath)
Table table = loadIcebergTable(tablePath);

// Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table.
// Inorder to create such table in non iceberg catalog, we need to send across catalog
// properties also.
// As Flink API accepts only Map<String, String> for props, here we are serializing catalog
// props as json string to distinguish between catalog and table properties in createTable.
// name, database, table as json string to distinguish between catalog info
// and table properties in createTable.
String srcCatalogProps =
FlinkCreateTableOptions.toJson(
getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps);
getName(), tablePath.getDatabaseName(), tablePath.getObjectName());

Map<String, String> tableProps = table.properties();
if (tableProps.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ protected Catalog createCatalog(
defaultDatabase,
baseNamespace,
catalogLoader,
properties,
cacheEnabled,
cacheExpirationIntervalMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink;

import java.util.Map;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.util.JsonUtil;
Expand All @@ -27,14 +26,11 @@ class FlinkCreateTableOptions {
private final String catalogName;
private final String catalogDb;
private final String catalogTable;
private final Map<String, String> catalogProps;

private FlinkCreateTableOptions(
String catalogName, String catalogDb, String catalogTable, Map<String, String> props) {
private FlinkCreateTableOptions(String catalogName, String catalogDb, String catalogTable) {
this.catalogName = catalogName;
this.catalogDb = catalogDb;
this.catalogTable = catalogTable;
this.catalogProps = props;
}

public static final ConfigOption<String> CATALOG_NAME =
Expand All @@ -61,12 +57,6 @@ private FlinkCreateTableOptions(
.noDefaultValue()
.withDescription("Table name managed in the underlying iceberg catalog and database.");

public static final ConfigOption<Map<String, String>> CATALOG_PROPS =
ConfigOptions.key("catalog-props")
.mapType()
.noDefaultValue()
.withDescription("Properties for the underlying catalog for iceberg table.");

public static final ConfigOption<Boolean> USE_DYNAMIC_ICEBERG_SINK =
ConfigOptions.key("use-dynamic-iceberg-sink")
.booleanType()
Expand All @@ -89,15 +79,13 @@ private FlinkCreateTableOptions(
public static final String CONNECTOR_PROPS_KEY = "connector";
public static final String LOCATION_KEY = "location";

static String toJson(
String catalogName, String catalogDb, String catalogTable, Map<String, String> catalogProps) {
static String toJson(String catalogName, String catalogDb, String catalogTable) {
return JsonUtil.generate(
gen -> {
gen.writeStartObject();
gen.writeStringField(CATALOG_NAME.key(), catalogName);
gen.writeStringField(CATALOG_DATABASE.key(), catalogDb);
gen.writeStringField(CATALOG_TABLE.key(), catalogTable);
JsonUtil.writeStringMap(CATALOG_PROPS.key(), catalogProps, gen);
gen.writeEndObject();
},
false);
Expand All @@ -110,9 +98,8 @@ static FlinkCreateTableOptions fromJson(String createTableOptions) {
String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node);
String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node);
String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node);
Map<String, String> catalogProps = JsonUtil.getStringMap(CATALOG_PROPS.key(), node);

return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable, catalogProps);
return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable);
});
}

Expand All @@ -127,8 +114,4 @@ String catalogDb() {
String catalogTable() {
return catalogTable;
}

Map<String, String> catalogProps() {
return catalogProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,13 @@ private static TableLoader createTableLoader(
}

/**
* Merges source catalog properties with connector properties. Iceberg Catalog properties are
* serialized as json in FlinkCatalog#getTable to be able to isolate catalog props from iceberg
* table props, Here, we flatten and merge them back to use to create catalog.
* Merges source catalog properties (catalog name, database, table) with connector properties.
* Source catalog name, database, table are serialized as json in FlinkCatalog#getTable to be able
* to isolate them from iceberg table props, Here, we flatten and merge them back.
*
* @param tableProps the existing table properties
* @return a map of merged properties, with source catalog properties taking precedence when keys
* conflict
* @return a map of merged properties, defaulting to source catalog name, database and table
* unless overridden.
*/
private static Map<String, String> mergeSrcCatalogProps(Map<String, String> tableProps) {
String srcCatalogProps = tableProps.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY);
Expand All @@ -257,7 +257,6 @@ private static Map<String, String> mergeSrcCatalogProps(Map<String, String> tabl
FlinkCreateTableOptions.CATALOG_DATABASE.key(), createTableOptions.catalogDb());
mergedProps.put(
FlinkCreateTableOptions.CATALOG_TABLE.key(), createTableOptions.catalogTable());
mergedProps.putAll(createTableOptions.catalogProps());

tableProps.forEach(
(k, v) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void before() {
config.put(CatalogProperties.URI, getURI(hiveConf));
}
config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot()));
config.put("extra-catalog-prop", "extra-value");

this.flinkDatabase = catalogName + "." + DATABASE;
this.icebergNamespace =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,15 @@ public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException {
.column("id", DataTypes.BIGINT())
.build());

String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", config);
String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl");
Map<String, String> options = catalogTable.getOptions();
assertThat(options)
.containsEntry(
FlinkCreateTableOptions.CONNECTOR_PROPS_KEY,
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)
.containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps);
assertThat(options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY))
.doesNotContain("extra-catalog-prop", "extra-value");
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ public void testReadFlinkDynamicTable() throws Exception {
List<Record> expected = generateExpectedRecords(false);
SqlHelpers.sql(
getTableEnv(),
"create table `default_catalog`.`default_database`.flink_table LIKE iceberg_catalog.`default`.%s",
"create table `default_catalog`.`default_database`.flink_table "
+ "WITH ('catalog-type'='hadoop', 'warehouse'='%s') "
+ "LIKE iceberg_catalog.`default`.%s",
CATALOG_EXTENSION.warehouse(),
TestFixtures.TABLE);

// Read from table in flink catalog
Expand All @@ -199,8 +202,11 @@ public void testWatermarkInvalidConfig() {
getStreamingTableEnv(),
"CREATE TABLE %s "
+ "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE iceberg_catalog.`default`.%s",
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) "
+ "WITH ('catalog-type'='hadoop', 'warehouse'='%s') "
+ "LIKE iceberg_catalog.`default`.%s",
flinkTable,
CATALOG_EXTENSION.warehouse(),
TestFixtures.TABLE);

assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable))
Expand All @@ -218,8 +224,11 @@ public void testWatermarkValidConfig() throws Exception {
getStreamingTableEnv(),
"CREATE TABLE %s "
+ "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH ('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s",
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) "
+ "WITH ('catalog-type'='hadoop', 'warehouse'='%s', 'watermark-column'='t1') "
+ "LIKE iceberg_catalog.`default`.%s",
flinkTable,
CATALOG_EXTENSION.warehouse(),
TestFixtures.TABLE);

TestHelpers.assertRecordsWithOrder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,17 @@ public class FlinkCatalog extends AbstractCatalog {
private final Namespace baseNamespace;
private final SupportsNamespaces asNamespaceCatalog;
private final Closeable closeable;
private final Map<String, String> catalogProps;
private final boolean cacheEnabled;

public FlinkCatalog(
String catalogName,
String defaultDatabase,
Namespace baseNamespace,
CatalogLoader catalogLoader,
Map<String, String> catalogProps,
boolean cacheEnabled,
long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Expand Down Expand Up @@ -339,13 +336,12 @@ public CatalogTable getTable(ObjectPath tablePath)
Table table = loadIcebergTable(tablePath);

// Flink's CREATE TABLE LIKE clause relies on properties sent back here to create new table.
// Inorder to create such table in non iceberg catalog, we need to send across catalog
// properties also.
// As Flink API accepts only Map<String, String> for props, here we are serializing catalog
// props as json string to distinguish between catalog and table properties in createTable.
// name, database, table as json string to distinguish between catalog info
// and table properties in createTable.
String srcCatalogProps =
FlinkCreateTableOptions.toJson(
getName(), tablePath.getDatabaseName(), tablePath.getObjectName(), catalogProps);
getName(), tablePath.getDatabaseName(), tablePath.getObjectName());

Map<String, String> tableProps = table.properties();
if (tableProps.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ protected Catalog createCatalog(
defaultDatabase,
baseNamespace,
catalogLoader,
properties,
cacheEnabled,
cacheExpirationIntervalMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink;

import java.util.Map;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.util.JsonUtil;
Expand All @@ -27,14 +26,11 @@ class FlinkCreateTableOptions {
private final String catalogName;
private final String catalogDb;
private final String catalogTable;
private final Map<String, String> catalogProps;

private FlinkCreateTableOptions(
String catalogName, String catalogDb, String catalogTable, Map<String, String> props) {
private FlinkCreateTableOptions(String catalogName, String catalogDb, String catalogTable) {
this.catalogName = catalogName;
this.catalogDb = catalogDb;
this.catalogTable = catalogTable;
this.catalogProps = props;
}

public static final ConfigOption<String> CATALOG_NAME =
Expand All @@ -61,12 +57,6 @@ private FlinkCreateTableOptions(
.noDefaultValue()
.withDescription("Table name managed in the underlying iceberg catalog and database.");

public static final ConfigOption<Map<String, String>> CATALOG_PROPS =
ConfigOptions.key("catalog-props")
.mapType()
.noDefaultValue()
.withDescription("Properties for the underlying catalog for iceberg table.");

public static final ConfigOption<Boolean> USE_DYNAMIC_ICEBERG_SINK =
ConfigOptions.key("use-dynamic-iceberg-sink")
.booleanType()
Expand All @@ -89,15 +79,13 @@ private FlinkCreateTableOptions(
public static final String CONNECTOR_PROPS_KEY = "connector";
public static final String LOCATION_KEY = "location";

static String toJson(
String catalogName, String catalogDb, String catalogTable, Map<String, String> catalogProps) {
static String toJson(String catalogName, String catalogDb, String catalogTable) {
return JsonUtil.generate(
gen -> {
gen.writeStartObject();
gen.writeStringField(CATALOG_NAME.key(), catalogName);
gen.writeStringField(CATALOG_DATABASE.key(), catalogDb);
gen.writeStringField(CATALOG_TABLE.key(), catalogTable);
JsonUtil.writeStringMap(CATALOG_PROPS.key(), catalogProps, gen);
gen.writeEndObject();
},
false);
Expand All @@ -110,9 +98,8 @@ static FlinkCreateTableOptions fromJson(String createTableOptions) {
String catalogName = JsonUtil.getString(CATALOG_NAME.key(), node);
String catalogDb = JsonUtil.getString(CATALOG_DATABASE.key(), node);
String catalogTable = JsonUtil.getString(CATALOG_TABLE.key(), node);
Map<String, String> catalogProps = JsonUtil.getStringMap(CATALOG_PROPS.key(), node);

return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable, catalogProps);
return new FlinkCreateTableOptions(catalogName, catalogDb, catalogTable);
});
}

Expand All @@ -127,8 +114,4 @@ String catalogDb() {
String catalogTable() {
return catalogTable;
}

Map<String, String> catalogProps() {
return catalogProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,13 @@ private static TableLoader createTableLoader(
}

/**
* Merges source catalog properties with connector properties. Iceberg Catalog properties are
* serialized as json in FlinkCatalog#getTable to be able to isolate catalog props from iceberg
* table props, Here, we flatten and merge them back to use to create catalog.
* Merges source catalog properties (catalog name, database, table) with connector properties.
* Source catalog name, database, table are serialized as json in FlinkCatalog#getTable to be able
* to isolate them from iceberg table props, Here, we flatten and merge them back.
*
* @param tableProps the existing table properties
* @return a map of merged properties, with source catalog properties taking precedence when keys
* conflict
* @return a map of merged properties, defaulting to source catalog name, database and table
* unless overridden.
*/
private static Map<String, String> mergeSrcCatalogProps(Map<String, String> tableProps) {
String srcCatalogProps = tableProps.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY);
Expand All @@ -257,7 +257,6 @@ private static Map<String, String> mergeSrcCatalogProps(Map<String, String> tabl
FlinkCreateTableOptions.CATALOG_DATABASE.key(), createTableOptions.catalogDb());
mergedProps.put(
FlinkCreateTableOptions.CATALOG_TABLE.key(), createTableOptions.catalogTable());
mergedProps.putAll(createTableOptions.catalogProps());

tableProps.forEach(
(k, v) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void before() {
config.put(CatalogProperties.URI, getURI(hiveConf));
}
config.put(CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouseRoot()));
config.put("extra-catalog-prop", "extra-value");

this.flinkDatabase = catalogName + "." + DATABASE;
this.icebergNamespace =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
Expand Down Expand Up @@ -231,19 +230,15 @@ public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException {
.column("id", DataTypes.BIGINT())
.build());

// `type` option is filtered out by Flink
// https://github.com/apache/flink/blob/edc3d68736de73665440f4313ddcfd9142d8d42b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java#L378
Map<String, String> filteredOptions = Maps.newHashMap(config);
filteredOptions.remove(CommonCatalogOptions.CATALOG_TYPE.key());

String srcCatalogProps =
FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", filteredOptions);
String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl");
Map<String, String> options = catalogTable.getOptions();
assertThat(options)
.containsEntry(
FlinkCreateTableOptions.CONNECTOR_PROPS_KEY,
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)
.containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps);
assertThat(options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY))
.doesNotContain("extra-catalog-prop", "extra-value");
}

@TestTemplate
Expand Down
Loading