Skip to content

Commit 4c51ebc

Browse files
committed
Added name config option for ice-rest-catalog that enables multiple catalogs share the same etcd/postgresql/etc. instance
1 parent 61c6c30 commit 4c51ebc

File tree

3 files changed

+35
-19
lines changed

3 files changed

+35
-19
lines changed

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -310,12 +310,13 @@ public Integer call() throws Exception {
310310
// TODO: ensure all http handlers are hooked in
311311
JvmMetrics.builder().register();
312312

313+
String catalogName = config.name();
313314
String catalogImpl = icebergConfig.get(CatalogProperties.CATALOG_IMPL);
314315
Catalog catalog;
315316
if (EtcdCatalog.class.getName().equals(catalogImpl)) {
316-
catalog = newEctdCatalog(icebergConfig);
317+
catalog = newEctdCatalog(catalogName, icebergConfig);
317318
} else {
318-
catalog = CatalogUtil.buildIcebergCatalog("default", icebergConfig, null);
319+
catalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergConfig, null);
319320
}
320321

321322
// Initialize and start the maintenance scheduler
@@ -369,7 +370,7 @@ private void initializeMaintenanceScheduler(Catalog catalog, Config config) {
369370
}
370371
}
371372

372-
private static Catalog newEctdCatalog(Map<String, String> config) {
373+
private static Catalog newEctdCatalog(String name, Map<String, String> config) {
373374
// TODO: remove; params all verified by config
374375
String uri = config.getOrDefault(CatalogProperties.URI, "etcd:http://localhost:2379");
375376
Preconditions.checkArgument(
@@ -388,8 +389,7 @@ private static Catalog newEctdCatalog(Map<String, String> config) {
388389
CatalogProperties.FILE_IO_IMPL);
389390

390391
var io = CatalogUtil.loadFileIO(ioImpl, config, null);
391-
return new EtcdCatalog(
392-
"default", Strings.removePrefix(uri, "etcd:"), inputWarehouseLocation, io);
392+
return new EtcdCatalog(name, Strings.removePrefix(uri, "etcd:"), inputWarehouseLocation, io);
393393
}
394394

395395
public static void main(String[] args) throws Exception {

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public record Config(
4141
@JsonPropertyDescription("host:port (0.0.0.0:5001 by default)") String debugAddr,
4242
@JsonPropertyDescription("host:port, e.g. localhost:5002 (disabled by default)")
4343
String adminAddr,
44+
@JsonPropertyDescription("Catalog name (defaults to \"default\")") String name,
4445
@JsonPropertyDescription("Catalog storage URI: jdbc:..., etcd:...") String uri,
4546
@JsonPropertyDescription("Path to warehouse, e.g. s3://..., file://...") String warehouse,
4647
@JsonPropertyDescription(
@@ -69,6 +70,7 @@ public Config(
6970
String addr,
7071
String debugAddr,
7172
String adminAddr,
73+
String name,
7274
@JsonProperty(required = true) String uri,
7375
@JsonProperty(required = true) String warehouse,
7476
String localFileIOBaseDir,
@@ -82,6 +84,7 @@ public Config(
8284
this.addr = Strings.orDefault(addr, DEFAULT_ADDR);
8385
this.debugAddr = Strings.orDefault(debugAddr, DEFAULT_DEBUG_ADDR);
8486
this.adminAddr = Strings.orDefault(adminAddr, System.getenv("ICE_REST_CATALOG_ADMIN_ADDR"));
87+
this.name = Strings.orDefault(name, "default");
8588
this.uri = uri;
8689
this.warehouse = warehouse;
8790
this.localFileIOBaseDir = localFileIOBaseDir;

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.iceberg.exceptions.RuntimeIOException;
5353
import org.apache.iceberg.exceptions.ValidationException;
5454
import org.apache.iceberg.io.FileIO;
55+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
5556
import org.apache.iceberg.util.LocationUtil;
5657
import org.slf4j.Logger;
5758
import org.slf4j.LoggerFactory;
@@ -71,6 +72,8 @@ public class EtcdCatalog extends BaseMetastoreCatalog implements SupportsNamespa
7172

7273
public EtcdCatalog(String name, String uri, String warehouseLocation, FileIO io) {
7374
this.catalogName = name;
75+
Preconditions.checkArgument(
76+
name != null && !name.isBlank() && !name.contains("/"), "Invalid catalog name");
7477
this.warehouseLocation = LocationUtil.stripTrailingSlash(warehouseLocation);
7578
var etcdClient =
7679
Client.builder().endpoints(uri.split(",")).keepaliveWithoutCalls(false).build();
@@ -124,8 +127,15 @@ public boolean namespaceExists(Namespace namespace) {
124127
return unwrap(kv.get(key, GetOption.builder().withCountOnly(true).build())).getCount() > 0;
125128
}
126129

127-
private static String namespaceKey(Namespace namespace) {
128-
return NAMESPACE_PREFIX + namespaceToPath(namespace);
130+
private String namespaceKey(Namespace namespace) {
131+
return namespacePrefix() + namespaceToPath(namespace);
132+
}
133+
134+
private String namespacePrefix() {
135+
if ("default".equals(catalogName)) { // for backward-compatibility
136+
return NAMESPACE_PREFIX;
137+
}
138+
return catalogName + "/" + NAMESPACE_PREFIX;
129139
}
130140

131141
private static <T> T unwrapCommit(java.util.concurrent.CompletableFuture<T> x) {
@@ -195,9 +205,7 @@ private void validateTableIdentifier(TableIdentifier identifier) {
195205
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
196206
validateNamespace(namespace);
197207
String prefix = namespaceKey(namespace);
198-
if (!namespace.isEmpty()) {
199-
prefix += "/";
200-
}
208+
prefix = prefix.endsWith("/") ? prefix : prefix + "/";
201209
GetResponse res = unwrap(kv.get(byteSeq(prefix), GetOption.builder().isPrefix(true).build()));
202210
if (res.getKvs().isEmpty() && !namespace.isEmpty()) {
203211
if (!namespaceExists(namespace)) {
@@ -209,7 +217,7 @@ public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespac
209217
k ->
210218
Namespace.of(
211219
Strings.removePrefix(
212-
k.getKey().toString(StandardCharsets.UTF_8), NAMESPACE_PREFIX)
220+
k.getKey().toString(StandardCharsets.UTF_8), namespacePrefix())
213221
.split("/")))
214222
.toList();
215223
}
@@ -330,9 +338,7 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
330338
public List<TableIdentifier> listTables(Namespace namespace) {
331339
validateNamespace(namespace);
332340
String prefix = tableKey(namespace);
333-
if (!namespace.isEmpty()) {
334-
prefix += "/";
335-
}
341+
prefix = prefix.endsWith("/") ? prefix : prefix + "/";
336342
GetResponse res = unwrap(kv.get(byteSeq(prefix), GetOption.builder().isPrefix(true).build()));
337343
if (res.getKvs().isEmpty() && !namespace.isEmpty()) {
338344
if (!namespaceExists(namespace)) {
@@ -343,17 +349,24 @@ public List<TableIdentifier> listTables(Namespace namespace) {
343349
.map(
344350
k ->
345351
TableIdentifier.of(
346-
Strings.removePrefix(k.getKey().toString(StandardCharsets.UTF_8), TABLE_PREFIX)
352+
Strings.removePrefix(k.getKey().toString(StandardCharsets.UTF_8), tablePrefix())
347353
.split("/")))
348354
.toList();
349355
}
350356

351-
private static String tableKey(Namespace namespace) {
352-
return TABLE_PREFIX + namespaceToPath(namespace);
357+
private String tableKey(Namespace namespace) {
358+
return tablePrefix() + namespaceToPath(namespace);
359+
}
360+
361+
private String tableKey(TableIdentifier identifier) {
362+
return tablePrefix() + tableIdentifierToPath(identifier);
353363
}
354364

355-
private static String tableKey(TableIdentifier identifier) {
356-
return TABLE_PREFIX + tableIdentifierToPath(identifier);
365+
private String tablePrefix() {
366+
if ("default".equals(catalogName)) { // for backward-compatibility
367+
return TABLE_PREFIX;
368+
}
369+
return catalogName + "/" + TABLE_PREFIX;
357370
}
358371

359372
private static String tableIdentifierToPath(TableIdentifier identifier) {

0 commit comments

Comments
 (0)