diff --git a/delegation-service/README.md b/delegation-service/README.md new file mode 100644 index 0000000000..9a104b106b --- /dev/null +++ b/delegation-service/README.md @@ -0,0 +1,57 @@ + + +**Under Development** - This module is currently being developed as part of the Polaris Delegation Service. + +# Polaris Delegation Service + +An optional, independent service designed to handle long-running background operations offloaded from the main Polaris catalog service. This service enables the Polaris to maintain low-latency performance for metadata operations while allowing heavy background tasks to be managed and scaled independently. + +## Overview + +The Delegation Service is responsible for executing resource-intensive tasks that would otherwise impact the core performance of the Polaris Catalog. The initial implementation focuses on handling data file deletion processes for `DROP TABLE ... PURGE` commands. + +## Key Features + +- **Independent Scaling**: Can be scaled separately from the main Polaris Catalog workload +- **Resilient Task Execution**: Provides recovery capabilities and retry mechanisms +- **Secure Communication**: Establishes secure communication via mutual TLS with Polaris +- **One-to-One Model**: Each Polaris realm pairs with a dedicated Delegation Service instance + +## Architecture + +The Delegation Service operates as a standalone microservice that: + +1. Receives task delegation requests from Polaris Catalog via REST API +2. Executes long-running operations +3. Maintains its own persistence layer for task tracking +4. Provides synchronous execution for the MVP with plans for async operations + +## Initial Implementation + +The MVP focuses on: +- **DROP TABLE WITH PURGE**: Synchronous delegation of data file deletion +- **Task Persistence**: Independent database schema for task state management + +## Future Roadmap + +- Asynchronous task submission and status tracking +- Integration with Polaris Asynchronous & Reliable Tasks Framework +- Additional delegated operations (compaction, snapshot garbage collection) +- Support for scheduled background tasks diff --git a/delegation-service/build.gradle.kts b/delegation-service/build.gradle.kts new file mode 100644 index 0000000000..0cc0093574 --- /dev/null +++ b/delegation-service/build.gradle.kts @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.gradle.api.tasks.compile.JavaCompile + +plugins { id("polaris-java") } + +tasks.withType(JavaCompile::class.java).configureEach { options.release = 21 } + +dependencies { + // Core dependencies + implementation(project(":polaris-core")) + + // JAX-RS and REST APIs + implementation(libs.jakarta.ws.rs.api) + implementation(libs.jakarta.inject.api) + implementation(libs.jakarta.validation.api) + implementation(libs.jakarta.enterprise.cdi.api) + + // JSON processing + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-core") + implementation("com.fasterxml.jackson.core:jackson-databind") + implementation("com.fasterxml.jackson.core:jackson-annotations") + + // Logging + implementation(libs.slf4j.api) + + // Testing + testImplementation(platform(libs.junit.bom)) + testImplementation("org.junit.jupiter:junit-jupiter") + testImplementation(libs.mockito.core) + testImplementation(libs.assertj.core) + testRuntimeOnly("org.junit.platform:junit-platform-launcher") + + testImplementation(libs.logback.classic) +} diff --git a/delegation-service/src/main/java/org/apache/polaris/delegation/api/DelegationApi.java b/delegation-service/src/main/java/org/apache/polaris/delegation/api/DelegationApi.java new file mode 100644 index 0000000000..e32418cc6b --- /dev/null +++ b/delegation-service/src/main/java/org/apache/polaris/delegation/api/DelegationApi.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.delegation.api; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import org.apache.polaris.delegation.api.model.TaskExecutionRequest; +import org.apache.polaris.delegation.api.model.TaskExecutionResponse; +import org.apache.polaris.delegation.api.model.TaskType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * REST API for the Polaris Delegation Service. + * + *
Provides endpoints for submitting long-running tasks for synchronous execution. This API + * allows the main Polaris catalog to offload resource-intensive operations to maintain low-latency + * performance. + * + *
Note: This is the initial API framework implementation. The actual task + * execution logic will be implemented in future development phases. + */ +@Path("/api/v1/tasks/execute") +@Consumes(MediaType.APPLICATION_JSON) +@Produces(MediaType.APPLICATION_JSON) +public class DelegationApi { + + private static final Logger LOGGER = LoggerFactory.getLogger(DelegationApi.class); + + /** + * Submit a task for delegated execution. + * + *
The task will be executed synchronously and the response will contain the execution result. + * + *
Current Implementation: Returns a placeholder response to demonstrate the + * API contract. Actual task execution logic will be implemented in future development phases. + * + * @param request the task execution request + * @return the task execution response with completion information + */ + @POST + @Path("/synchronous") + public Response submitTask(@Valid @NotNull TaskExecutionRequest request) { + TaskType taskType = request.getCommonPayload().getTaskType(); + String realmId = request.getCommonPayload().getRealmIdentifier(); + LOGGER.info("Delegation API called - task_type={}, realm_id={}", taskType, realmId); + + try { + // Placeholder response for API framework demonstration + // TODO: Implement actual task execution logic in future phases + TaskExecutionResponse response = + new TaskExecutionResponse("COMPLETED", "API framework operational - task acknowledged"); + + LOGGER.info("Returning placeholder response for task_type={}", taskType); + return Response.ok(response).build(); + + } catch (Exception e) { + LOGGER.error("Failed to process task request", e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(new ErrorResponse("Task processing failed: " + e.getMessage())) + .build(); + } + } + + /** Simple error response model. */ + public static class ErrorResponse { + private final String message; + + public ErrorResponse(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + } +} diff --git a/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/CommonPayload.java b/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/CommonPayload.java new file mode 100644 index 0000000000..2172437a5a --- /dev/null +++ b/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/CommonPayload.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.delegation.api.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; +import java.time.OffsetDateTime; +import java.util.Objects; + +/** + * Common payload data included in all delegation tasks. + * + *
Contains global task information that applies to all task types within the delegation service. + * Uses the {@link TaskType} enum to ensure type safety for operation types. + */ +public class CommonPayload { + + @NotNull private final TaskType taskType; + + @NotNull private final OffsetDateTime requestTimestampUtc; + + @NotNull private final String realmIdentifier; + + @JsonCreator + public CommonPayload( + @JsonProperty("task_type") @NotNull TaskType taskType, + @JsonProperty("request_timestamp_utc") @NotNull OffsetDateTime requestTimestampUtc, + @JsonProperty("realm_identifier") @NotNull String realmIdentifier) { + this.taskType = taskType; + this.requestTimestampUtc = requestTimestampUtc; + this.realmIdentifier = realmIdentifier; + } + + @JsonProperty("task_type") + public TaskType getTaskType() { + return taskType; + } + + @JsonProperty("request_timestamp_utc") + public OffsetDateTime getRequestTimestampUtc() { + return requestTimestampUtc; + } + + @JsonProperty("realm_identifier") + public String getRealmIdentifier() { + return realmIdentifier; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CommonPayload that = (CommonPayload) o; + return Objects.equals(taskType, that.taskType) + && Objects.equals(requestTimestampUtc, that.requestTimestampUtc) + && Objects.equals(realmIdentifier, that.realmIdentifier); + } + + @Override + public int hashCode() { + return Objects.hash(taskType, requestTimestampUtc, realmIdentifier); + } + + @Override + public String toString() { + return "CommonPayload{" + + "taskType=" + + taskType + + ", requestTimestampUtc=" + + requestTimestampUtc + + ", realmIdentifier='" + + realmIdentifier + + '\'' + + '}'; + } +} diff --git a/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/OperationParameters.java b/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/OperationParameters.java new file mode 100644 index 0000000000..08900b385c --- /dev/null +++ b/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/OperationParameters.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.delegation.api.model; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * Base class for operation-specific parameters in delegation tasks. + * + *
This abstract class serves as the base for all operation-specific parameter types. Each + * concrete subclass represents parameters for a specific type of delegation operation, providing + * type safety and clear separation of concerns. + * + *
Uses Jackson polymorphism to serialize/deserialize different parameter types based on the + * operation type. This allows the API to handle different parameter structures while maintaining + * type safety. + * + *
Supported Operation Types: + * + *
Adding New Operation Types: + * + *
Provides the complete hierarchical path to identify a table within the Polaris catalog system.
+ */
+public class TableIdentity {
+
+ @NotNull private final String catalogName;
+
+ @NotNull private final List Contains all the information needed to perform a table purge operation, including the table
+ * identity and optional configuration properties.
+ *
+ * This class represents parameters for {@code PURGE_TABLE} operations, which handle the deletion
+ * of data files associated with dropped tables.
+ *
+ * Example Usage:
+ *
+ * Contains all the information needed to submit a task to the delegation service, structured
+ * according to the delegation service task payload schema.
+ */
+public class TaskExecutionRequest {
+
+ @NotNull private final CommonPayload commonPayload;
+
+ @NotNull private final OperationParameters operationParameters;
+
+ @JsonCreator
+ public TaskExecutionRequest(
+ @JsonProperty("common_payload") @NotNull CommonPayload commonPayload,
+ @JsonProperty("operation_parameters") @NotNull OperationParameters operationParameters) {
+ this.commonPayload = commonPayload;
+ this.operationParameters = operationParameters;
+ }
+
+ @JsonProperty("common_payload")
+ public CommonPayload getCommonPayload() {
+ return commonPayload;
+ }
+
+ @JsonProperty("operation_parameters")
+ public OperationParameters getOperationParameters() {
+ return operationParameters;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TaskExecutionRequest that = (TaskExecutionRequest) o;
+ return Objects.equals(commonPayload, that.commonPayload)
+ && Objects.equals(operationParameters, that.operationParameters);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(commonPayload, operationParameters);
+ }
+
+ @Override
+ public String toString() {
+ return "TaskExecutionRequest{"
+ + "commonPayload="
+ + commonPayload
+ + ", operationParameters="
+ + operationParameters
+ + '}';
+ }
+}
diff --git a/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/TaskExecutionResponse.java b/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/TaskExecutionResponse.java
new file mode 100644
index 0000000000..0726200a97
--- /dev/null
+++ b/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/TaskExecutionResponse.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.delegation.api.model;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.constraints.NotNull;
+import java.util.Objects;
+
+/**
+ * Response from executing a delegated task.
+ *
+ * Contains the execution result and timing information for the completed task. Since the
+ * delegation service executes tasks synchronously in the MVP, this response indicates the task has
+ * already been completed.
+ */
+public class TaskExecutionResponse {
+
+ @NotNull private final String status;
+
+ private final String resultSummary;
+
+ @JsonCreator
+ public TaskExecutionResponse(
+ @JsonProperty("status") @NotNull String status,
+ @JsonProperty("result_summary") String resultSummary) {
+ this.status = status;
+ this.resultSummary = resultSummary;
+ }
+
+ @JsonProperty("status")
+ public String getStatus() {
+ return status;
+ }
+
+ @JsonProperty("result_summary")
+ public String getResultSummary() {
+ return resultSummary;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TaskExecutionResponse that = (TaskExecutionResponse) o;
+ return Objects.equals(status, that.status) && Objects.equals(resultSummary, that.resultSummary);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, resultSummary);
+ }
+
+ @Override
+ public String toString() {
+ return "TaskExecutionResponse{"
+ + "status='"
+ + status
+ + '\''
+ + ", resultSummary='"
+ + resultSummary
+ + '\''
+ + '}';
+ }
+}
diff --git a/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/TaskType.java b/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/TaskType.java
new file mode 100644
index 0000000000..7b220be938
--- /dev/null
+++ b/delegation-service/src/main/java/org/apache/polaris/delegation/api/model/TaskType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.delegation.api.model;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Types of tasks that can be delegated to the Delegation Service.
+ *
+ * This enum defines the various long-running, resource-intensive operations that can be
+ * offloaded from the main Polaris catalog service.
+ */
+public enum TaskType {
+
+ /**
+ * Data file deletion task for DROP TABLE WITH PURGE operations. This is the initial task type
+ * supported by the delegation service.
+ */
+ PURGE_TABLE("PURGE_TABLE");
+
+ private final String value;
+
+ TaskType(String value) {
+ this.value = value;
+ }
+
+ @JsonValue
+ public String getValue() {
+ return value;
+ }
+
+ @JsonCreator
+ public static TaskType fromValue(String value) {
+ for (TaskType taskType : TaskType.values()) {
+ if (taskType.value.equals(value)) {
+ return taskType;
+ }
+ }
+ throw new IllegalArgumentException("Unknown TaskType: " + value);
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+}
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index fecea71f8c..2457eef3e7 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -20,6 +20,7 @@
polaris-bom=bom
polaris-core=polaris-core
+polaris-delegation-service=delegation-service
polaris-api-iceberg-service=api/iceberg-service
polaris-api-management-model=api/management-model
polaris-api-management-service=api/management-service
+ * TablePurgeParameters params = new TablePurgeParameters(
+ * new TableIdentity("catalog", List.of("namespace"), "table"),
+ * Map.of("skipTrash", "true", "batchSize", "1000")
+ * );
+ *
+ */
+public class TablePurgeParameters extends OperationParameters {
+
+ @NotNull private final TableIdentity tableIdentity;
+
+ private final Map