Skip to content

Add Delegation Service Module and REST Framework #1981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions delegation-service/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

**Under Development** - This module is currently being developed as part of the Polaris Delegation Service.

# Polaris Delegation Service

An optional, independent service designed to handle long-running background operations offloaded from the main Polaris catalog service. This service enables the Polaris to maintain low-latency performance for metadata operations while allowing heavy background tasks to be managed and scaled independently.

## Overview

The Delegation Service is responsible for executing resource-intensive tasks that would otherwise impact the core performance of the Polaris Catalog. The initial implementation focuses on handling data file deletion processes for `DROP TABLE ... PURGE` commands.

## Key Features

- **Independent Scaling**: Can be scaled separately from the main Polaris Catalog workload
- **Resilient Task Execution**: Provides recovery capabilities and retry mechanisms
- **Secure Communication**: Establishes secure communication via mutual TLS with Polaris
- **One-to-One Model**: Each Polaris realm pairs with a dedicated Delegation Service instance

## Architecture

The Delegation Service operates as a standalone microservice that:

1. Receives task delegation requests from Polaris Catalog via REST API
2. Executes long-running operations
3. Maintains its own persistence layer for task tracking
4. Provides synchronous execution for the MVP with plans for async operations

## Initial Implementation

The MVP focuses on:
- **DROP TABLE WITH PURGE**: Synchronous delegation of data file deletion
- **Task Persistence**: Independent database schema for task state management

## Future Roadmap

- Asynchronous task submission and status tracking
- Integration with Polaris Asynchronous & Reliable Tasks Framework
- Additional delegated operations (compaction, snapshot garbage collection)
- Support for scheduled background tasks
53 changes: 53 additions & 0 deletions delegation-service/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.gradle.api.tasks.compile.JavaCompile

plugins { id("polaris-java") }

tasks.withType(JavaCompile::class.java).configureEach { options.release = 21 }

dependencies {
// Core dependencies
implementation(project(":polaris-core"))

// JAX-RS and REST APIs
implementation(libs.jakarta.ws.rs.api)
implementation(libs.jakarta.inject.api)
implementation(libs.jakarta.validation.api)
implementation(libs.jakarta.enterprise.cdi.api)

// JSON processing
implementation(platform(libs.jackson.bom))
implementation("com.fasterxml.jackson.core:jackson-core")
implementation("com.fasterxml.jackson.core:jackson-databind")
implementation("com.fasterxml.jackson.core:jackson-annotations")

// Logging
implementation(libs.slf4j.api)

// Testing
testImplementation(platform(libs.junit.bom))
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation(libs.mockito.core)
testImplementation(libs.assertj.core)
testRuntimeOnly("org.junit.platform:junit-platform-launcher")

testImplementation(libs.logback.classic)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.delegation.api;

import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.polaris.delegation.api.model.TaskExecutionRequest;
import org.apache.polaris.delegation.api.model.TaskExecutionResponse;
import org.apache.polaris.delegation.api.model.TaskType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* REST API for the Polaris Delegation Service.
*
* <p>Provides endpoints for submitting long-running tasks for synchronous execution. This API
* allows the main Polaris catalog to offload resource-intensive operations to maintain low-latency
* performance.
*
* <p><strong>Note:</strong> This is the initial API framework implementation. The actual task
* execution logic will be implemented in future development phases.
*/
@Path("/api/v1/tasks/execute")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class DelegationApi {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add @experimental here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a Polaris API, and within the context of the delegation service (itself experimental), this is not really an experimental API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My proposal was more to "flag" it experimental (from code perspective). If the "whole" service is experimental, I'm fine with that.

Maybe a couple of tests to illustrate how it works would be welcome.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading the proposal, I think what will actually be experimental will be the RemoteTaskExecutor within Polaris, but that's not implemented in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jbonofre for your concern! As @eric-maynard has mentioned, the task executor for delegating to the remote service can be flagged as experimental.

In regards to illustrating how it would all work, an end-to-end test is in preparation.


private static final Logger LOGGER = LoggerFactory.getLogger(DelegationApi.class);

/**
* Submit a task for delegated execution.
*
* <p>The task will be executed synchronously and the response will contain the execution result.
*
* <p><strong>Current Implementation:</strong> Returns a placeholder response to demonstrate the
* API contract. Actual task execution logic will be implemented in future development phases.
*
* @param request the task execution request
* @return the task execution response with completion information
*/
@POST
@Path("/synchronous")
public Response submitTask(@Valid @NotNull TaskExecutionRequest request) {
TaskType taskType = request.getCommonPayload().getTaskType();
String realmId = request.getCommonPayload().getRealmIdentifier();
LOGGER.info("Delegation API called - task_type={}, realm_id={}", taskType, realmId);

try {
// Placeholder response for API framework demonstration
// TODO: Implement actual task execution logic in future phases
TaskExecutionResponse response =
new TaskExecutionResponse("COMPLETED", "API framework operational - task acknowledged");

LOGGER.info("Returning placeholder response for task_type={}", taskType);
return Response.ok(response).build();

} catch (Exception e) {
LOGGER.error("Failed to process task request", e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(new ErrorResponse("Task processing failed: " + e.getMessage()))
.build();
}
}

/** Simple error response model. */
public static class ErrorResponse {
private final String message;

public ErrorResponse(String message) {
this.message = message;
}

public String getMessage() {
return message;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.delegation.api.model;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
import java.time.OffsetDateTime;
import java.util.Objects;

/**
* Common payload data included in all delegation tasks.
*
* <p>Contains global task information that applies to all task types within the delegation service.
* Uses the {@link TaskType} enum to ensure type safety for operation types.
*/
public class CommonPayload {

@NotNull private final TaskType taskType;

@NotNull private final OffsetDateTime requestTimestampUtc;

@NotNull private final String realmIdentifier;

@JsonCreator
public CommonPayload(
@JsonProperty("task_type") @NotNull TaskType taskType,
@JsonProperty("request_timestamp_utc") @NotNull OffsetDateTime requestTimestampUtc,
@JsonProperty("realm_identifier") @NotNull String realmIdentifier) {
this.taskType = taskType;
this.requestTimestampUtc = requestTimestampUtc;
this.realmIdentifier = realmIdentifier;
}

@JsonProperty("task_type")
public TaskType getTaskType() {
return taskType;
}

@JsonProperty("request_timestamp_utc")
public OffsetDateTime getRequestTimestampUtc() {
return requestTimestampUtc;
}

@JsonProperty("realm_identifier")
public String getRealmIdentifier() {
return realmIdentifier;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CommonPayload that = (CommonPayload) o;
return Objects.equals(taskType, that.taskType)
&& Objects.equals(requestTimestampUtc, that.requestTimestampUtc)
&& Objects.equals(realmIdentifier, that.realmIdentifier);
}

@Override
public int hashCode() {
return Objects.hash(taskType, requestTimestampUtc, realmIdentifier);
}

@Override
public String toString() {
return "CommonPayload{"
+ "taskType="
+ taskType
+ ", requestTimestampUtc="
+ requestTimestampUtc
+ ", realmIdentifier='"
+ realmIdentifier
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.delegation.api.model;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

/**
* Base class for operation-specific parameters in delegation tasks.
*
* <p>This abstract class serves as the base for all operation-specific parameter types. Each
* concrete subclass represents parameters for a specific type of delegation operation, providing
* type safety and clear separation of concerns.
*
* <p>Uses Jackson polymorphism to serialize/deserialize different parameter types based on the
* operation type. This allows the API to handle different parameter structures while maintaining
* type safety.
*
* <p><strong>Supported Operation Types:</strong>
*
* <ul>
* <li>{@link TablePurgeParameters} - For TABLE_PURGE operations
* </ul>
*
* <p><strong>Adding New Operation Types:</strong>
*
* <ol>
* <li>Create a new concrete subclass extending {@code OperationParameters}
* <li>Add the subclass to the {@link JsonSubTypes} annotation
* <li>Define the operation-specific fields and validation
* </ol>
*/
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "task_type")
@JsonSubTypes({@JsonSubTypes.Type(value = TablePurgeParameters.class, name = "PURGE_TABLE")})
public abstract class OperationParameters {

/**
* Gets the task type that these parameters support.
*
* @return the task type
*/
public abstract TaskType getTaskType();
}
Loading
Loading