diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java index 0274d0ea81..c235d61c11 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java @@ -101,6 +101,24 @@ public List listNamespaces(String catalog, Namespace parent) { } } + public ListNamespacesResponse listNamespaces( + String catalog, Namespace parent, String pageToken, String pageSize) { + Map queryParams = new HashMap<>(); + if (!parent.isEmpty()) { + // TODO change this for Iceberg 1.7.2: + // queryParams.put("parent", RESTUtil.encodeNamespace(parent)); + queryParams.put("parent", Joiner.on('\u001f').join(parent.levels())); + } + queryParams.put("pageToken", pageToken); + queryParams.put("pageSize", pageSize); + try (Response response = + request("v1/{cat}/namespaces", Map.of("cat", catalog), queryParams).get()) { + assertThat(response.getStatus()).isEqualTo(OK.getStatusCode()); + ListNamespacesResponse res = response.readEntity(ListNamespacesResponse.class); + return res; + } + } + public List listAllNamespacesChildFirst(String catalog) { List result = new ArrayList<>(); for (int idx = -1; idx < result.size(); idx++) { @@ -142,6 +160,20 @@ public List listTables(String catalog, Namespace namespace) { } } + public ListTablesResponse listTables( + String catalog, Namespace namespace, String pageToken, String pageSize) { + String ns = RESTUtil.encodeNamespace(namespace); + Map queryParams = new HashMap<>(); + queryParams.put("pageToken", pageToken); + queryParams.put("pageSize", pageSize); + try (Response res = + request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat", catalog), queryParams) + .get()) { + assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + return res.readEntity(ListTablesResponse.class); + } + } + public void dropTable(String catalog, TableIdentifier id) { String ns = RESTUtil.encodeNamespace(id.namespace()); try (Response res = diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java index 8ebee36f0a..2bf55cd7ed 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java @@ -69,6 +69,8 @@ import org.apache.iceberg.rest.RESTUtil; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.types.Types; import org.apache.polaris.core.admin.model.Catalog; @@ -161,7 +163,8 @@ public abstract class PolarisRestCatalogIntegrationBase extends CatalogTests query = session @@ -303,6 +314,11 @@ List lookupFullEntitiesActive( .setParameter("parentId", parentId) .setParameter("typeCode", entityType.getCode()); + if (entityIdToken.isPresent()) { + long tokenId = entityIdToken.get().entityId(); + query = query.setParameter("tokenId", tokenId); + } + return query.getResultList(); } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 5c3dd1dbaf..c93765c840 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -53,7 +54,7 @@ import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; -import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.EntityIdToken; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; @@ -459,7 +460,7 @@ public Page listEntities( @Nonnull Predicate entityFilter, @Nonnull Function transformer, @Nonnull PageToken pageToken) { - Map params = + Map whereEquals = Map.of( "catalog_id", catalogId, @@ -469,29 +470,41 @@ public Page listEntities( entityType.getCode(), "realm_id", realmId); + Map whereGreater; // Limit can't be pushed down, due to client side filtering // absence of transaction. + String orderByColumnName = null; + if (pageToken.paginationRequested()) { + orderByColumnName = ModelEntity.ID_COLUMN; + whereGreater = + pageToken + .valueAs(EntityIdToken.class) + .map( + entityIdToken -> + Map.of(ModelEntity.ID_COLUMN, entityIdToken.entityId())) + .orElse(Map.of()); + } else { + whereGreater = Map.of(); + } + try { PreparedQuery query = QueryGenerator.generateSelectQuery( - ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params); - List results = new ArrayList<>(); + ModelEntity.ALL_COLUMNS, + ModelEntity.TABLE_NAME, + whereEquals, + whereGreater, + orderByColumnName); + AtomicReference> results = new AtomicReference<>(); datasourceOperations.executeSelectOverStream( query, new ModelEntity(), stream -> { var data = stream.filter(entityFilter); - if (pageToken instanceof HasPageSize hasPageSize) { - data = data.limit(hasPageSize.getPageSize()); - } - data.forEach(results::add); + results.set(Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity)); }); - List resultsOrEmpty = - results == null - ? Collections.emptyList() - : results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList()); - return Page.fromItems(resultsOrEmpty); + return results.get(); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e); diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java index c6bad0a1c5..a06bf283a0 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -59,8 +60,27 @@ public static PreparedQuery generateSelectQuery( @Nonnull List projections, @Nonnull String tableName, @Nonnull Map whereClause) { - QueryFragment where = generateWhereClause(new HashSet<>(projections), whereClause); - PreparedQuery query = generateSelectQuery(projections, tableName, where.sql()); + return generateSelectQuery(projections, tableName, whereClause, Map.of(), null); + } + + /** + * Generates a SELECT query with projection and filtering. + * + * @param projections List of columns to retrieve. + * @param tableName Target table name. + * @param whereEquals Column-value pairs used in WHERE filtering. + * @return A parameterized SELECT query. + * @throws IllegalArgumentException if any whereClause column isn't in projections. + */ + public static PreparedQuery generateSelectQuery( + @Nonnull List projections, + @Nonnull String tableName, + @Nonnull Map whereEquals, + @Nonnull Map whereGreater, + @Nullable String orderByColumn) { + QueryFragment where = + generateWhereClause(new HashSet<>(projections), whereEquals, whereGreater); + PreparedQuery query = generateSelectQuery(projections, tableName, where.sql(), orderByColumn); return new PreparedQuery(query.sql(), where.parameters()); } @@ -108,7 +128,8 @@ public static PreparedQuery generateSelectQueryWithEntityIds( params.add(realmId); String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND realm_id = ?"; return new PreparedQuery( - generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where).sql(), params); + generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where, null).sql(), + params); } /** @@ -157,7 +178,7 @@ public static PreparedQuery generateUpdateQuery( @Nonnull List values, @Nonnull Map whereClause) { List bindingParams = new ArrayList<>(values); - QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause); + QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause, Map.of()); String setClause = allColumns.stream().map(c -> c + " = ?").collect(Collectors.joining(", ")); String sql = "UPDATE " + getFullyQualifiedTableName(tableName) + " SET " + setClause + where.sql(); @@ -177,34 +198,49 @@ public static PreparedQuery generateDeleteQuery( @Nonnull List tableColumns, @Nonnull String tableName, @Nonnull Map whereClause) { - QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause); + QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause, Map.of()); return new PreparedQuery( "DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), where.parameters()); } private static PreparedQuery generateSelectQuery( - @Nonnull List columnNames, @Nonnull String tableName, @Nonnull String filter) { + @Nonnull List columnNames, + @Nonnull String tableName, + @Nonnull String filter, + @Nullable String orderByColumn) { String sql = "SELECT " + String.join(", ", columnNames) + " FROM " + getFullyQualifiedTableName(tableName) + filter; + if (orderByColumn != null) { + sql += " ORDER BY " + orderByColumn + " ASC"; + } return new PreparedQuery(sql, Collections.emptyList()); } @VisibleForTesting static QueryFragment generateWhereClause( - @Nonnull Set tableColumns, @Nonnull Map whereClause) { + @Nonnull Set tableColumns, + @Nonnull Map whereEquals, + @Nonnull Map whereGreater) { List conditions = new ArrayList<>(); List parameters = new ArrayList<>(); - for (Map.Entry entry : whereClause.entrySet()) { + for (Map.Entry entry : whereEquals.entrySet()) { if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) { throw new IllegalArgumentException("Invalid query column: " + entry.getKey()); } conditions.add(entry.getKey() + " = ?"); parameters.add(entry.getValue()); } + for (Map.Entry entry : whereGreater.entrySet()) { + if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) { + throw new IllegalArgumentException("Invalid query column: " + entry.getKey()); + } + conditions.add(entry.getKey() + " > ?"); + parameters.add(entry.getValue()); + } String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions); return new QueryFragment(clause, parameters); } @@ -258,7 +294,7 @@ public static PreparedQuery generateOverlapQuery( QueryFragment where = new QueryFragment(clause, finalParams); PreparedQuery query = - generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql()); + generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql(), null); return new PreparedQuery(query.sql(), where.parameters()); } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java index e9a2bdb550..6eaec072d4 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java @@ -33,6 +33,8 @@ public class ModelEntity implements Converter { public static final String TABLE_NAME = "ENTITIES"; + public static final String ID_COLUMN = "id"; + public static final List ALL_COLUMNS = List.of( "id", diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java index 798dd92e7d..6df78eff57 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java @@ -184,7 +184,8 @@ void testGenerateWhereClause_singleCondition() { Map whereClause = new HashMap<>(); whereClause.put("name", "test"); assertEquals( - " WHERE name = ?", QueryGenerator.generateWhereClause(Set.of("name"), whereClause).sql()); + " WHERE name = ?", + QueryGenerator.generateWhereClause(Set.of("name"), whereClause, Map.of()).sql()); } @Test @@ -194,13 +195,25 @@ void testGenerateWhereClause_multipleConditions() { whereClause.put("version", 1); assertEquals( " WHERE name = ? AND version = ?", - QueryGenerator.generateWhereClause(Set.of("name", "version"), whereClause).sql()); + QueryGenerator.generateWhereClause(Set.of("name", "version"), whereClause, Map.of()).sql()); + } + + @Test + void testGenerateWhereClause_multipleConditions_AndInequality() { + Map whereClause = new HashMap<>(); + whereClause.put("name", "test"); + whereClause.put("version", 1); + assertEquals( + " WHERE name = ? AND version = ? AND id > ?", + QueryGenerator.generateWhereClause( + Set.of("name", "version", "id"), whereClause, Map.of("id", 123)) + .sql()); } @Test void testGenerateWhereClause_emptyMap() { Map whereClause = Collections.emptyMap(); - assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause).sql()); + assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause, Map.of()).sql()); } @Test diff --git a/polaris-core/build.gradle.kts b/polaris-core/build.gradle.kts index 44965b7383..822872835a 100644 --- a/polaris-core/build.gradle.kts +++ b/polaris-core/build.gradle.kts @@ -36,6 +36,11 @@ dependencies { implementation("com.fasterxml.jackson.core:jackson-annotations") implementation("com.fasterxml.jackson.core:jackson-core") implementation("com.fasterxml.jackson.core:jackson-databind") + implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-smile") + runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-guava") + runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jdk8") + runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") + implementation(libs.caffeine) implementation(libs.commons.lang3) implementation(libs.commons.codec1) @@ -96,6 +101,9 @@ dependencies { implementation(platform(libs.google.cloud.storage.bom)) implementation("com.google.cloud:google-cloud-storage") + testCompileOnly(project(":polaris-immutables")) + testAnnotationProcessor(project(":polaris-immutables", configuration = "processor")) + testFixturesApi("com.fasterxml.jackson.core:jackson-core") testFixturesApi("com.fasterxml.jackson.core:jackson-databind") testFixturesApi(libs.commons.lang3) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java index e10c24f2f1..b37efaae37 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java @@ -19,7 +19,6 @@ package org.apache.polaris.core.catalog; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -62,20 +61,28 @@ public static Namespace getParentNamespace(Namespace namespace) { return Namespace.of(parentLevels); } - public static List nameAndIdToNamespaces( - List catalogPath, List entities) { + public static Namespace nameAndIdToNamespace( + List catalogPath, PolarisEntity.NameAndId entity) { + // Skip element 0 which is the catalog entity + String[] fullName = new String[catalogPath.size()]; + for (int i = 0; i < fullName.length - 1; ++i) { + fullName[i] = catalogPath.get(i + 1).getName(); + } + fullName[fullName.length - 1] = entity.getName(); + return Namespace.of(fullName); + } + + /** + * Given the shortnames/ids of entities that all live under the given catalogPath, reconstructs + * TableIdentifier objects for each that all hold the catalogPath excluding the catalog entity. + */ + public static Namespace parentNamespace(List catalogPath) { // Skip element 0 which is the catalog entity String[] parentNamespaces = new String[catalogPath.size() - 1]; for (int i = 0; i < parentNamespaces.length; ++i) { parentNamespaces[i] = catalogPath.get(i + 1).getName(); } - List namespaces = new ArrayList<>(); - for (PolarisEntity.NameAndId entity : entities) { - String[] fullName = Arrays.copyOf(parentNamespaces, parentNamespaces.length + 1); - fullName[fullName.length - 1] = entity.getName(); - namespaces.add(Namespace.of(fullName)); - } - return namespaces; + return Namespace.of(parentNamespaces); } /** diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 2f69e898e2..05b7e0dec3 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -222,6 +222,7 @@ public static void enforceFeatureEnabledOrThrow( public static final PolarisConfiguration LIST_PAGINATION_ENABLED = PolarisConfiguration.builder() .key("LIST_PAGINATION_ENABLED") + .catalogConfig("polaris.config.list-pagination-enabled") .description("If set to true, pagination for APIs like listTables is enabled.") .defaultValue(false) .buildFeatureConfiguration(); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index 08bda144ce..e4e77c1553 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.AsyncTaskType; @@ -698,23 +699,20 @@ private void revokeGrantRecord( // return list of active entities // TODO: Clean up shared logic for catalogId/parentId - long catalogId = - catalogPath == null || catalogPath.size() == 0 ? 0l : catalogPath.get(0).getId(); + long catalogId = catalogPath == null || catalogPath.isEmpty() ? 0L : catalogPath.get(0).getId(); long parentId = - catalogPath == null || catalogPath.size() == 0 - ? 0l + catalogPath == null || catalogPath.isEmpty() + ? 0L : catalogPath.get(catalogPath.size() - 1).getId(); - Page resultPage = - ms.listEntities(callCtx, catalogId, parentId, entityType, pageToken); // prune the returned list with only entities matching the entity subtype - if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { - resultPage = - pageToken.buildNextPage( - resultPage.items.stream() - .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) - .collect(Collectors.toList())); - } + Predicate filter = + entitySubType != PolarisEntitySubType.ANY_SUBTYPE + ? e -> e.getSubTypeCode() == entitySubType.getCode() + : entity -> true; + + Page resultPage = + ms.listEntities(callCtx, catalogId, parentId, entityType, filter, pageToken); // TODO: Use post-validation to enforce consistent view against catalogPath. In the // meantime, happens-before ordering semantics aren't guaranteed during high-concurrency @@ -957,7 +955,7 @@ private void revokeGrantRecord( e.getExistingEntity().getSubTypeCode())); } - return new EntitiesResult(createdEntities); + return new EntitiesResult(Page.fromItems(createdEntities)); } /** {@inheritDoc} */ @@ -1020,7 +1018,7 @@ private void revokeGrantRecord( } // good, all success - return new EntitiesResult(updatedEntities); + return new EntitiesResult(Page.fromItems(updatedEntities)); } /** {@inheritDoc} */ @@ -1185,7 +1183,7 @@ private void revokeGrantRecord( entity -> true, Function.identity(), PageToken.fromLimit(2)) - .items; + .items(); // if we have 2, we cannot drop the catalog. If only one left, better be the admin role if (catalogRoles.size() > 1) { @@ -1519,32 +1517,38 @@ private void revokeGrantRecord( Function.identity(), pageToken); - List loadedTasks = new ArrayList<>(); final AtomicInteger failedLeaseCount = new AtomicInteger(0); - availableTasks.items.forEach( - task -> { - PolarisBaseEntity.Builder updatedTaskBuilder = new PolarisBaseEntity.Builder(task); - Map properties = - PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); - properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); - properties.put( - PolarisTaskConstants.LAST_ATTEMPT_START_TIME, - String.valueOf(callCtx.getClock().millis())); - properties.put( - PolarisTaskConstants.ATTEMPT_COUNT, - String.valueOf( - Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) - + 1)); - updatedTaskBuilder.properties( - PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); - EntityResult result = - updateEntityPropertiesIfNotChanged(callCtx, null, updatedTaskBuilder.build()); - if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { - loadedTasks.add(result.getEntity()); - } else { - failedLeaseCount.getAndIncrement(); - } - }); + List loadedTasks = + availableTasks.items().stream() + .map( + task -> { + PolarisBaseEntity.Builder updatedTaskBuilder = + new PolarisBaseEntity.Builder(task); + Map properties = + PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); + properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); + properties.put( + PolarisTaskConstants.LAST_ATTEMPT_START_TIME, + String.valueOf(callCtx.getClock().millis())); + properties.put( + PolarisTaskConstants.ATTEMPT_COUNT, + String.valueOf( + Integer.parseInt( + properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) + + 1)); + updatedTaskBuilder.properties( + PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); + EntityResult result = + updateEntityPropertiesIfNotChanged(callCtx, null, updatedTaskBuilder.build()); + if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { + return result.getEntity(); + } else { + failedLeaseCount.getAndIncrement(); + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); // Since the contract of this method is to only return an empty list once no available tasks // are found anymore, if we happen to fail to lease any tasks at all due to all of them diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java index e27b69680f..13c7422f02 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java @@ -18,25 +18,20 @@ */ package org.apache.polaris.core.persistence.dao.entity; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.List; -import java.util.Optional; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.persistence.pagination.Page; -import org.apache.polaris.core.persistence.pagination.PageToken; /** a set of returned entities result */ public class EntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned - private final List entities; - private final Optional pageTokenOpt; + private final Page entities; public static EntitiesResult fromPage(Page page) { - return new EntitiesResult(page.items, Optional.ofNullable(page.pageToken)); + return new EntitiesResult(page); } /** @@ -48,11 +43,6 @@ public static EntitiesResult fromPage(Page page) { public EntitiesResult(@Nonnull ReturnStatus errorStatus, @Nullable String extraInformation) { super(errorStatus, extraInformation); this.entities = null; - this.pageTokenOpt = Optional.empty(); - } - - public EntitiesResult(@Nonnull List entities) { - this(entities, Optional.empty()); } /** @@ -60,29 +50,12 @@ public EntitiesResult(@Nonnull List entities) { * * @param entities list of entities being returned, implies success */ - public EntitiesResult( - @Nonnull List entities, @Nonnull Optional pageTokenOpt) { + public EntitiesResult(@Nonnull Page entities) { super(ReturnStatus.SUCCESS); this.entities = entities; - this.pageTokenOpt = pageTokenOpt; - } - - @JsonCreator - private EntitiesResult( - @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus, - @JsonProperty("extraInformation") String extraInformation, - @JsonProperty("entities") List entities, - @JsonProperty("pageToken") Optional pageTokenOpt) { - super(returnStatus, extraInformation); - this.entities = entities; - this.pageTokenOpt = pageTokenOpt; - } - - public List getEntities() { - return entities; } - public Optional getPageToken() { - return pageTokenOpt; + public @Nullable List getEntities() { + return entities == null ? null : entities.items(); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java index 10669e8994..a7a51d2297 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java @@ -18,26 +18,21 @@ */ package org.apache.polaris.core.persistence.dao.entity; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.List; -import java.util.Optional; import org.apache.polaris.core.entity.EntityNameLookupRecord; import org.apache.polaris.core.persistence.pagination.Page; -import org.apache.polaris.core.persistence.pagination.PageToken; /** the return the result for a list entities call */ public class ListEntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned - private final List entities; - private final Optional pageTokenOpt; + private final Page entities; /** Create a {@link ListEntitiesResult} from a {@link Page} */ public static ListEntitiesResult fromPage(Page page) { - return new ListEntitiesResult(page.items, Optional.ofNullable(page.pageToken)); + return new ListEntitiesResult(page); } /** @@ -46,13 +41,9 @@ public static ListEntitiesResult fromPage(Page page) { * @param errorCode error code, cannot be SUCCESS * @param extraInformation extra information */ - public ListEntitiesResult( - @Nonnull ReturnStatus errorCode, - @Nullable String extraInformation, - @Nonnull Optional pageTokenOpt) { + public ListEntitiesResult(@Nonnull ReturnStatus errorCode, @Nullable String extraInformation) { super(errorCode, extraInformation); this.entities = null; - this.pageTokenOpt = pageTokenOpt; } /** @@ -60,29 +51,16 @@ public ListEntitiesResult( * * @param entities list of entities being returned, implies success */ - public ListEntitiesResult( - @Nonnull List entities, @Nonnull Optional pageTokenOpt) { + public ListEntitiesResult(Page entities) { super(ReturnStatus.SUCCESS); this.entities = entities; - this.pageTokenOpt = pageTokenOpt; } - @JsonCreator - private ListEntitiesResult( - @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus, - @JsonProperty("extraInformation") String extraInformation, - @JsonProperty("entities") List entities, - @JsonProperty("pageToken") Optional pageTokenOpt) { - super(returnStatus, extraInformation); - this.entities = entities; - this.pageTokenOpt = pageTokenOpt; - } - - public List getEntities() { - return entities; + public @Nullable List getEntities() { + return entities == null ? null : entities.items(); } - public Optional getPageToken() { - return pageTokenOpt; + public Page getPage() { + return entities == null ? Page.fromItems(List.of()) : entities; } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java deleted file mode 100644 index d46ea7b026..0000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.persistence.pagination; - -import java.util.List; - -/** - * A {@link PageToken} string that represents the lack of a page token. Returns `null` in - * `toTokenString`, which the client will interpret as there being no more data available. - */ -public class DonePageToken extends PageToken { - - public DonePageToken() {} - - @Override - public String toTokenString() { - return null; - } - - @Override - protected PageToken updated(List newData) { - throw new IllegalStateException("DonePageToken.updated is invalid"); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdToken.java new file mode 100644 index 0000000000..8a9a03b1bd --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdToken.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.persistence.pagination; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import jakarta.annotation.Nullable; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.immutables.PolarisImmutable; + +/** Pagination {@linkplain Token token} backed by {@link PolarisBaseEntity#getId() entity ID}. */ +@PolarisImmutable +@JsonSerialize(as = ImmutableEntityIdToken.class) +@JsonDeserialize(as = ImmutableEntityIdToken.class) +public interface EntityIdToken extends Token { + String ID = "e"; + + @JsonProperty("i") + long entityId(); + + @Override + default String getT() { + return ID; + } + + static @Nullable EntityIdToken fromEntity(PolarisBaseEntity entity) { + if (entity == null) { + return null; + } + return fromEntityId(entity.getId()); + } + + static EntityIdToken fromEntityId(long entityId) { + return ImmutableEntityIdToken.builder().entityId(entityId).build(); + } + + final class EntityIdTokenType implements TokenType { + @Override + public String id() { + return ID; + } + + @Override + public Class javaType() { + return EntityIdToken.class; + } + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java deleted file mode 100644 index c6b216fcd3..0000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.persistence.pagination; - -/** - * A light interface for {@link PageToken} implementations to express that they have a page size - * that should be respected - */ -public interface HasPageSize { - int getPageSize(); -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java index 18287f85c1..4a3de4d128 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java @@ -18,25 +18,99 @@ */ package org.apache.polaris.core.persistence.pagination; +import static java.util.Spliterators.iterator; + +import jakarta.annotation.Nullable; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** - * An immutable page of items plus their paging cursor. The {@link PageToken} here can be used to - * continue the listing operation that generated the `items`. + * An immutable page of items plus the next-page token value, if there are more items. The {@link + * #encodedResponseToken()} here can be used to continue the listing operation that generated the + * `items`. */ public class Page { - public final PageToken pageToken; - public final List items; + private final PageToken request; + private final List items; + @Nullable private final Token nextToken; - public Page(PageToken pageToken, List items) { - this.pageToken = pageToken; + private Page(PageToken request, @Nullable Token nextToken, List items) { + this.request = request; + this.nextToken = nextToken; this.items = items; } /** - * Used to wrap a {@link List} of items into a {@link Page } when there are no more pages + * Builds a complete response page for the full list of relevant items. No subsequence pages of + * related data exist. */ public static Page fromItems(List items) { - return new Page<>(new DonePageToken(), items); + return new Page<>(PageToken.readEverything(), null, items); + } + + /** + * Produces a response page by consuming the number of items from the provided stream according to + * the {@code request} parameter. Source items can be converted to a different type by providing a + * {@code mapper} function. The page token for the response will be produced from the request data + * combined with the pointer to the next page of data provided by the {@code dataPointer} + * function. + * + * @param request defines pagination parameters that were uses to produce this page of data. + * @param items stream of source data + * @param mapper converter from source data types to response data types. + * @param tokenBuilder determines the {@link Token} used to start the next page of data given the + * last item from the previous page. The output of this function will be available from {@link + * PageToken#value()} associated with the request for the next page. + */ + public static Page mapped( + PageToken request, Stream items, Function mapper, Function tokenBuilder) { + List data; + T last = null; + if (!request.paginationRequested()) { + // short-cut for "no pagination" + data = items.map(mapper).collect(Collectors.toList()); + } else { + data = new ArrayList<>(request.pageSize().orElse(10)); + + Iterator it = iterator(items.spliterator()); + int limit = request.pageSize().orElse(Integer.MAX_VALUE); + while (it.hasNext() && data.size() < limit) { + last = it.next(); + data.add(mapper.apply(last)); + } + + // Signal "no more data" if the number of items is less than the requested page size or if + // there is no more data. + if (data.size() < limit || !it.hasNext()) { + last = null; + } + } + + return new Page<>(request, tokenBuilder.apply(last), data); + } + + public List items() { + return items; + } + + /** + * Returns a page token in encoded form suitable for returning to API clients. The string returned + * from this method is expected to be parsed by {@link PageToken#build(String, Integer)} when + * servicing the request for the next page of related data. + */ + public @Nullable String encodedResponseToken() { + return PageTokenUtil.encodePageToken(request, nextToken); + } + + /** + * Converts this page of data to objects of a different type, while maintaining the underlying + * pointer to the next page of source data. + */ + public Page map(Function mapper) { + return new Page<>(request, nextToken, items.stream().map(mapper).collect(Collectors.toList())); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java index 2e335ccd40..a1dceffdd8 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java @@ -18,82 +18,75 @@ */ package org.apache.polaris.core.persistence.pagination; -import java.util.List; -import java.util.Objects; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import jakarta.annotation.Nullable; +import java.util.Optional; +import java.util.OptionalInt; +import org.apache.polaris.immutables.PolarisImmutable; -/** - * Represents a page token that can be used by operations like `listTables`. Clients that specify a - * `pageSize` (or a `pageToken`) may receive a `next-page-token` in the response, the content of - * which is a serialized PageToken. - * - *

By providing that in the next query's `pageToken`, the client can resume listing where they - * left off. If the client provides a `pageToken` or `pageSize` but `next-page-token` is null in the - * response, that means there is no more data to read. - */ -public abstract class PageToken { - - /** Build a new PageToken that reads everything */ - public static PageToken readEverything() { - return build(null, null); - } +/** A wrapper for pagination information passed in as part of a request. */ +@PolarisImmutable +@JsonSerialize(as = ImmutablePageToken.class) +@JsonDeserialize(as = ImmutablePageToken.class) +public interface PageToken { + // Serialization property names are intentionally short to reduce the size of the serialized + // paging token. - /** Build a new PageToken from an input String, without a specified page size */ - public static PageToken fromString(String token) { - return build(token, null); - } - - /** Build a new PageToken from a limit */ - public static PageToken fromLimit(Integer pageSize) { - return build(null, pageSize); - } + /** The requested page size (optional). */ + @JsonProperty("p") + OptionalInt pageSize(); - /** Build a {@link PageToken} from the input string and page size */ - public static PageToken build(String token, Integer pageSize) { - if (token == null || token.isEmpty()) { - if (pageSize != null) { - return new LimitPageToken(pageSize); - } else { - return new ReadEverythingPageToken(); - } - } else { - // TODO implement, split out by the token's prefix - throw new IllegalArgumentException("Unrecognized page token: " + token); - } + /** Convenience for {@code pageSize().isPresent()}. */ + default boolean paginationRequested() { + return pageSize().isPresent(); } - /** Serialize a {@link PageToken} into a string */ - public abstract String toTokenString(); - /** - * Builds a new page token to reflect new data that's been read. If the amount of data read is - * less than the pageSize, this will return a {@link DonePageToken} + * Paging token value, if present. Serialized paging tokens always have a value, but "synthetic" + * paging tokens like {@link #readEverything()} or {@link #fromLimit(int)} do not have a token + * value. */ - protected abstract PageToken updated(List newData); + @JsonProperty("v") + Optional value(); + + // Note: another property can be added to contain a (cryptographic) signature, if we want to + // ensure that a paging-token hasn't been tampered. /** - * Builds a {@link Page } from a {@link List}. The {@link PageToken} attached to the new - * {@link Page } is the same as the result of calling {@link #updated(List)} on this {@link - * PageToken}. + * Paging token value, if it is present and an instance of the given {@code type}. This is a + * convenience to prevent duplication of type casts. */ - public final Page buildNextPage(List data) { - return new Page(updated(data), data); + default Optional valueAs(Class type) { + return value() + .flatMap( + t -> + type.isAssignableFrom(t.getClass()) ? Optional.of(type.cast(t)) : Optional.empty()); + } + + /** Represents a non-paginated request. */ + static PageToken readEverything() { + return PageTokenUtil.READ_EVERYTHING; } - @Override - public final boolean equals(Object o) { - if (o instanceof PageToken) { - return Objects.equals(this.toTokenString(), ((PageToken) o).toTokenString()); - } else { - return false; - } + /** Represents a request to start paginating with a particular page size. */ + static PageToken fromLimit(int limit) { + return PageTokenUtil.fromLimit(limit); } - @Override - public final int hashCode() { - if (toTokenString() == null) { - return 0; - } else { - return toTokenString().hashCode(); - } + /** + * Reconstructs a page token from the API-level page token string (returned to the client in the + * response to a previous request for similar data) and an API-level new requested page size. + * + * @param serializedPageToken page token from the {@link Page#encodedResponseToken() previous + * page} + * @param requestedPageSize optional page size for the next page. If not set, the page size of the + * previous page (encoded in the page token string) will be reused. + * @see Page#encodedResponseToken() + */ + static PageToken build( + @Nullable String serializedPageToken, @Nullable Integer requestedPageSize) { + return PageTokenUtil.decodePageRequest(serializedPageToken, requestedPageSize); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java new file mode 100644 index 0000000000..8a811f41c7 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.persistence.pagination; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.String.format; +import static java.util.Collections.unmodifiableMap; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; +import com.fasterxml.jackson.dataformat.smile.databind.SmileMapper; +import com.google.common.annotations.VisibleForTesting; +import jakarta.annotation.Nullable; +import java.io.IOException; +import java.util.Base64; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.ServiceLoader; + +final class PageTokenUtil { + + private static final ObjectMapper SMILE_MAPPER = new SmileMapper().findAndRegisterModules(); + + /** Constant for {@link PageToken#readEverything()}. */ + static final PageToken READ_EVERYTHING = + new PageToken() { + @Override + public OptionalInt pageSize() { + return OptionalInt.empty(); + } + + @Override + public Optional value() { + return Optional.empty(); + } + + @Override + public int hashCode() { + return 1; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof PageToken)) { + return false; + } + PageToken other = (PageToken) obj; + return other.pageSize().isEmpty() && other.value().isEmpty(); + } + + @Override + public String toString() { + return "PageToken(everything)"; + } + }; + + static PageToken fromLimit(int limit) { + return new PageToken() { + @Override + public OptionalInt pageSize() { + return OptionalInt.of(limit); + } + + @Override + public Optional value() { + return Optional.empty(); + } + + @Override + public int hashCode() { + return 2; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof PageToken)) { + return false; + } + PageToken other = (PageToken) obj; + return other.pageSize().equals(pageSize()) && other.value().isEmpty(); + } + + @Override + public String toString() { + return "PageToken(limit = " + limit + ")"; + } + }; + } + + private PageTokenUtil() {} + + /** + * Decodes a {@link PageToken} from API request parameters for the page-size and a serialized page + * token. + */ + static PageToken decodePageRequest( + @Nullable String requestedPageToken, @Nullable Integer requestedPageSize) { + if (requestedPageToken != null) { + var bytes = Base64.getUrlDecoder().decode(requestedPageToken); + try { + var pageToken = SMILE_MAPPER.readValue(bytes, PageToken.class); + if (requestedPageSize != null) { + int pageSizeInt = requestedPageSize; + checkArgument(pageSizeInt >= 0, "Invalid page size"); + if (pageToken.pageSize().orElse(-1) != pageSizeInt) { + pageToken = ImmutablePageToken.builder().from(pageToken).pageSize(pageSizeInt).build(); + } + } + return pageToken; + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (requestedPageSize != null) { + int pageSizeInt = requestedPageSize; + checkArgument(pageSizeInt >= 0, "Invalid page size"); + return fromLimit(pageSizeInt); + } else { + return READ_EVERYTHING; + } + } + + /** + * Returns the encoded ({@link String} serialized) {@link PageToken} built from the given {@link + * PageToken currentPageToken}, the page token of the current request, and {@link Token + * nextToken}, the token for the next page. + * + * @param currentPageToken page token of the currently handled API request, must not be {@code + * null} + * @param nextToken token for the next page, can be {@code null}, in which case the result will be + * {@code null} + * @return base-64/url-encoded serialized {@link PageToken} for the next page. + */ + static @Nullable String encodePageToken(PageToken currentPageToken, @Nullable Token nextToken) { + if (nextToken == null) { + return null; + } + + return serializePageToken( + ImmutablePageToken.builder() + .pageSize(currentPageToken.pageSize()) + .value(nextToken) + .build()); + } + + /** + * Serializes the given {@link PageToken pageToken} + * + * @return base-64/url-encoded serialized {@link PageToken} for the next page. + */ + @VisibleForTesting + static @Nullable String serializePageToken(PageToken pageToken) { + if (pageToken == null) { + return null; + } + + try { + var serialized = SMILE_MAPPER.writeValueAsBytes(pageToken); + return Base64.getUrlEncoder().encodeToString(serialized); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** Lazily initialized registry of all token-types. */ + private static final class Registry { + private static final Map BY_ID; + + static { + var byId = new HashMap(); + var loader = ServiceLoader.load(Token.TokenType.class); + loader.stream() + .map(ServiceLoader.Provider::get) + .forEach( + tokenType -> { + var ex = byId.put(tokenType.id(), tokenType); + if (ex != null) { + throw new IllegalStateException( + format("Duplicate token type ID: from %s and %s", tokenType, ex)); + } + }); + BY_ID = unmodifiableMap(byId); + } + } + + /** + * Jackson type-id resolver, resolves a {@link Token#getT() token type value} to a concrete Java + * type, consulting the {@link Registry}. + */ + static final class TokenTypeIdResolver extends TypeIdResolverBase { + private JavaType baseType; + + public TokenTypeIdResolver() {} + + @Override + public void init(JavaType bt) { + baseType = bt; + } + + @Override + public String idFromValue(Object value) { + return getId(value); + } + + @Override + public String idFromValueAndType(Object value, Class suggestedType) { + return getId(value); + } + + @Override + public JsonTypeInfo.Id getMechanism() { + return JsonTypeInfo.Id.CUSTOM; + } + + private String getId(Object value) { + if (value instanceof Token) { + return ((Token) value).getT(); + } + + return null; + } + + @Override + public JavaType typeFromId(DatabindContext context, String id) { + var idLower = id.toLowerCase(Locale.ROOT); + var asType = Registry.BY_ID.get(idLower); + if (asType == null) { + throw new IllegalStateException("Cannot deserialize paging token value of type " + idLower); + } + if (baseType.getRawClass().isAssignableFrom(asType.javaType())) { + return context.constructSpecializedType(baseType, asType.javaType()); + } + + // This is rather a "test-only" code path, but it might happen in real life as well, when + // calling the ObjectMapper with a "too specific" type and not just Change.class. + // So we can get here for example, if the baseType (induced by the type passed to + // ObjectMapper), is GenericChange.class, but the type is a "well known" type like + // ChangeRename.class. + @SuppressWarnings("unchecked") + var concrete = (Class) baseType.getRawClass(); + return context.constructSpecializedType(baseType, concrete); + } + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java deleted file mode 100644 index c8476c3511..0000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.persistence.pagination; - -import java.util.List; - -/** - * A {@link PageToken} implementation for readers who want to read everything. The behavior when - * using this token should be the same as when reading without a token. - */ -public class ReadEverythingPageToken extends PageToken { - - public static String PREFIX = "read-everything"; - - public ReadEverythingPageToken() {} - - @Override - public String toTokenString() { - return PREFIX; - } - - @Override - protected PageToken updated(List newData) { - return new DonePageToken(); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Token.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Token.java new file mode 100644 index 0000000000..34adb9560a --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Token.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.persistence.pagination; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; +import org.immutables.value.Value; + +/** + * Token base interface. + * + *

Concrete token implementations extend this {@link Token} interface and provide a Java services + * registered class that implements {@link Token.TokenType}. + * + *

Serialization property names should be intentionally short to reduce the size of the + * serialized paging token. + * + *

Example: + * + * {@snippet : + * @PolarisImmutable + * @JsonSerialize(as = ImmutableExampleToken.class) + * @JsonDeserialize(as = ImmutableExampleToken.class) + * public interface ExampleToken extends Token { + * String ID = "example"; + * + * @Override + * default String getT() { + * return ID; + * } + * + * @JsonProperty("a") + * long a(); + * + * @JsonProperty("b") + * String b(); + * + * static ExampleToken newExampleToken(long a, String b) { + * return ImmutableExampleToken.builder().a(a).b(b).build(); + * } + * + * final class ExampleTokenType implements TokenType { + * @Override + * public String id() { + * return ID; + * } + * + * @Override + * public Class javaType() { + * return ExampleToken.class; + * } + * } + * } + * } + * + * plus a resource file {@code + * META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType} containing + * {@code org.apache.polaris.examples.pagetoken.ExampleToken$ExampleTokenType}. + */ +@JsonTypeIdResolver(PageTokenUtil.TokenTypeIdResolver.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "t", visible = true) +public interface Token { + + @Value.Redacted + @JsonIgnore + // must use 'getT' here, otherwise the property won't be properly "wired" to be the type info and + // Jackson (deserialization) fails with + // 'com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "t"', if + // this property is just named 'String t()' + String getT(); + + /** Token type specification, referenced via Java's service loader mechanism. */ + interface TokenType { + /** + * ID of the token type, must be equal to the result of {@link Token#getT()} of the concrete + * {@link #javaType() token type}. + */ + String id(); + + /** Concrete token type. */ + Class javaType(); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index e79dafcf54..8c8e26eb8d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -31,6 +31,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.AsyncTaskType; @@ -686,8 +687,8 @@ private void bootstrapPolarisService( } /** - * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType, PolarisEntitySubType, - * PageToken)} + * See {@link PolarisMetaStoreManager#listEntities(PolarisCallContext, List, PolarisEntityType, + * PolarisEntitySubType, PageToken)} */ private @Nonnull ListEntitiesResult listEntities( @Nonnull PolarisCallContext callCtx, @@ -701,24 +702,25 @@ private void bootstrapPolarisService( // return if we failed to resolve if (resolver.isFailure()) { - return new ListEntitiesResult( - BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null, Optional.empty()); + return new ListEntitiesResult(BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null); } - // return list of active entities - Page resultPage = - ms.listEntitiesInCurrentTxn( - callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(), entityType, pageToken); - + Predicate filter = entity -> true; // prune the returned list with only entities matching the entity subtype if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { - resultPage = - pageToken.buildNextPage( - resultPage.items.stream() - .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) - .collect(Collectors.toList())); + filter = e -> e.getSubTypeCode() == entitySubType.getCode(); } + // return list of active entities + Page resultPage = + ms.listEntitiesInCurrentTxn( + callCtx, + resolver.getCatalogIdOrNull(), + resolver.getParentId(), + entityType, + filter, + pageToken); + // done return ListEntitiesResult.fromPage(resultPage); } @@ -1076,7 +1078,7 @@ private void bootstrapPolarisService( } createdEntities.add(entityCreateResult.getEntity()); } - return new EntitiesResult(createdEntities); + return new EntitiesResult(Page.fromItems(createdEntities)); }); } @@ -1180,7 +1182,7 @@ private void bootstrapPolarisService( } // good, all success - return new EntitiesResult(updatedEntities); + return new EntitiesResult(Page.fromItems(updatedEntities)); } /** {@inheritDoc} */ @@ -1385,7 +1387,7 @@ private void bootstrapPolarisService( entity -> true, Function.identity(), PageToken.fromLimit(2)) - .items; + .items(); // if we have 2, we cannot drop the catalog. If only one left, better be the admin role if (catalogRoles.size() > 1) { @@ -1971,36 +1973,42 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( Function.identity(), pageToken); - List loadedTasks = new ArrayList<>(); - availableTasks.items.forEach( - task -> { - PolarisBaseEntity.Builder updatedTask = new PolarisBaseEntity.Builder(task); - Map properties = - PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); - properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); - properties.put( - PolarisTaskConstants.LAST_ATTEMPT_START_TIME, - String.valueOf(callCtx.getClock().millis())); - properties.put( - PolarisTaskConstants.ATTEMPT_COUNT, - String.valueOf( - Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) - + 1)); - updatedTask.properties(PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); - EntityResult result = - updateEntityPropertiesIfNotChanged(callCtx, ms, null, updatedTask.build()); - if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { - loadedTasks.add(result.getEntity()); - } else { - // TODO: Consider performing incremental leasing of individual tasks one at a time - // instead of requiring all-or-none semantics for all the tasks we think we listed, - // or else contention could be very bad. - ms.rollback(); - throw new RetryOnConcurrencyException( - "Failed to lease available task with status %s, info: %s", - result.getReturnStatus(), result.getExtraInformation()); - } - }); + List loadedTasks = + availableTasks.items().stream() + .map( + task -> { + PolarisBaseEntity.Builder updatedTask = new PolarisBaseEntity.Builder(task); + Map properties = + PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); + properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); + properties.put( + PolarisTaskConstants.LAST_ATTEMPT_START_TIME, + String.valueOf(callCtx.getClock().millis())); + properties.put( + PolarisTaskConstants.ATTEMPT_COUNT, + String.valueOf( + Integer.parseInt( + properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) + + 1)); + updatedTask.properties( + PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); + EntityResult result = + updateEntityPropertiesIfNotChanged(callCtx, ms, null, updatedTask.build()); + if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { + return result.getEntity(); + } else { + // TODO: Consider performing incremental leasing of individual tasks one at a + // time + // instead of requiring all-or-none semantics for all the tasks we think we + // listed, + // or else contention could be very bad. + ms.rollback(); + throw new RetryOnConcurrencyException( + "Failed to lease available task with status %s, info: %s", + result.getReturnStatus(), result.getExtraInformation()); + } + }) + .collect(Collectors.toList()); return EntitiesResult.fromPage(Page.fromItems(loadedTasks)); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java index 12907b08d5..3bc7fd976f 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java @@ -21,6 +21,7 @@ import com.google.common.base.Predicates; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -44,7 +45,7 @@ import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; -import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.EntityIdToken; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; @@ -367,13 +368,23 @@ public List lookupEntityActiveBatchInCurrentTxn( .map( nameRecord -> this.lookupEntityInCurrentTxn( - callCtx, catalogId, nameRecord.getId(), entityType.getCode())) - .filter(entityFilter); - if (pageToken instanceof HasPageSize) { - data = data.limit(((HasPageSize) pageToken).getPageSize()); - } + callCtx, catalogId, nameRecord.getId(), entityType.getCode())); + + Predicate tokenFilter = + pageToken + .valueAs(EntityIdToken.class) + .map( + entityIdToken -> { + var nextId = entityIdToken.entityId(); + return (Predicate) e -> e.getId() > nextId; + }) + .orElse(e -> true); + + data = data.sorted(Comparator.comparingLong(PolarisEntityCore::getId)).filter(tokenFilter); + + data = data.filter(entityFilter); - return Page.fromItems(data.map(transformer).collect(Collectors.toList())); + return Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity); } /** {@inheritDoc} */ diff --git a/polaris-core/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType b/polaris-core/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType new file mode 100644 index 0000000000..3579dd29b3 --- /dev/null +++ b/polaris-core/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.core.persistence.pagination.EntityIdToken$EntityIdTokenType diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java similarity index 54% rename from polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java rename to polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java index 18586446ca..5053ac3cea 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java @@ -18,35 +18,36 @@ */ package org.apache.polaris.core.persistence.pagination; -import java.util.List; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.util.Optional; +import java.util.OptionalInt; +import org.apache.polaris.immutables.PolarisImmutable; -/** - * A {@link PageToken} implementation that has a page size, but no start offset. This can be used to - * represent a `limit`. When updated, it returns {@link DonePageToken}. As such it should never be - * user-facing and doesn't truly paginate. - */ -public class LimitPageToken extends PageToken implements HasPageSize { - - public static final String PREFIX = "limit"; +@PolarisImmutable +@JsonSerialize(as = ImmutableDummyTestToken.class) +@JsonDeserialize(as = ImmutableDummyTestToken.class) +public interface DummyTestToken extends Token { + String ID = "test-dummy"; - private final int pageSize; + Optional s(); - public LimitPageToken(int pageSize) { - this.pageSize = pageSize; - } + OptionalInt i(); @Override - public int getPageSize() { - return pageSize; + default String getT() { + return ID; } - @Override - public String toTokenString() { - return String.format("%s/%d", PREFIX, pageSize); - } + final class DummyTestTokenType implements TokenType { + @Override + public String id() { + return ID; + } - @Override - protected PageToken updated(List newData) { - return new DonePageToken(); + @Override + public Class javaType() { + return DummyTestToken.class; + } } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java new file mode 100644 index 0000000000..338bbc53f5 --- /dev/null +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.persistence.pagination; + +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import java.util.OptionalInt; +import java.util.function.Function; +import java.util.stream.Stream; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +@ExtendWith(SoftAssertionsExtension.class) +class PageTokenTest { + @InjectSoftAssertions SoftAssertions soft; + + @Test + public void testReadEverything() { + PageToken r = PageToken.readEverything(); + soft.assertThat(r.paginationRequested()).isFalse(); + soft.assertThat(r.pageSize()).isEmpty(); + soft.assertThat(r.value()).isEmpty(); + + Page pageEverything = + Page.mapped( + r, + Stream.of(1, 2, 3, 4), + Function.identity(), + i -> i != null ? EntityIdToken.fromEntityId(i) : null); + soft.assertThat(pageEverything.encodedResponseToken()).isNull(); + soft.assertThat(pageEverything.items()).containsExactly(1, 2, 3, 4); + + r = PageToken.build(null, null); + soft.assertThat(r.paginationRequested()).isFalse(); + soft.assertThat(r.pageSize()).isEmpty(); + soft.assertThat(r.value()).isEmpty(); + } + + @Test + public void testLimit() { + PageToken r = PageToken.fromLimit(123); + soft.assertThat(r).isEqualTo(PageToken.build(null, 123)); + soft.assertThat(r.paginationRequested()).isTrue(); + soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(123)); + soft.assertThat(r.value()).isEmpty(); + } + + @Test + public void testTokenValueForPaging() { + PageToken r = PageToken.fromLimit(2); + soft.assertThat(r).isEqualTo(PageToken.build(null, 2)); + Page pageMoreData = + Page.mapped( + r, + Stream.of(1, 2, 3, 4), + Function.identity(), + i -> i != null ? EntityIdToken.fromEntityId(i) : null); + soft.assertThat(pageMoreData.encodedResponseToken()).isNotBlank(); + soft.assertThat(pageMoreData.items()).containsExactly(1, 2); + + // last page (no more data) - number of items is equal to the requested page size + Page lastPageSaturated = + Page.mapped( + r, + Stream.of(3, 4), + Function.identity(), + i -> i != null ? EntityIdToken.fromEntityId(i) : null); + // last page (no more data) - next-token must be null + soft.assertThat(lastPageSaturated.encodedResponseToken()).isNull(); + soft.assertThat(lastPageSaturated.items()).containsExactly(3, 4); + + // last page (no more data) - number of items is less than the requested page size + Page lastPageNotSaturated = + Page.mapped( + r, + Stream.of(3), + Function.identity(), + i -> i != null ? EntityIdToken.fromEntityId(i) : null); + soft.assertThat(lastPageNotSaturated.encodedResponseToken()).isNull(); + soft.assertThat(lastPageNotSaturated.items()).containsExactly(3); + + r = PageToken.fromLimit(200); + soft.assertThat(r).isEqualTo(PageToken.build(null, 200)); + Page page200 = + Page.mapped( + r, + Stream.of(1, 2, 3, 4), + Function.identity(), + i -> i != null ? EntityIdToken.fromEntityId(i) : null); + soft.assertThat(page200.encodedResponseToken()).isNull(); + soft.assertThat(page200.items()).containsExactly(1, 2, 3, 4); + } + + @ParameterizedTest + @MethodSource + public void testDeSer(Integer pageSize, String serializedPageToken, PageToken expectedPageToken) { + soft.assertThat(PageTokenUtil.decodePageRequest(serializedPageToken, pageSize)) + .isEqualTo(expectedPageToken); + } + + static Stream testDeSer() { + var entity42page123 = + ImmutablePageToken.builder().pageSize(123).value(EntityIdToken.fromEntityId(42)).build(); + var entity42page123ser = PageTokenUtil.serializePageToken(entity42page123); + return Stream.of( + arguments(null, null, PageToken.readEverything()), + arguments(123, null, PageToken.fromLimit(123)), + arguments(123, entity42page123ser, entity42page123), + arguments( + 123, + PageTokenUtil.serializePageToken( + ImmutablePageToken.builder() + .pageSize(999999) + .value(EntityIdToken.fromEntityId(42)) + .build()), + entity42page123)); + } + + @ParameterizedTest + @MethodSource + public void testApiRoundTrip(Token token) { + PageToken request = PageToken.build(null, 123); + Page page = Page.mapped(request, Stream.of("i1"), Function.identity(), x -> token); + soft.assertThat(page.encodedResponseToken()).isNotBlank(); + + PageToken r = PageToken.build(page.encodedResponseToken(), null); + soft.assertThat(r.value()).contains(token); + soft.assertThat(r.paginationRequested()).isTrue(); + soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(123)); + + r = PageToken.build(page.encodedResponseToken(), 456); + soft.assertThat(r.value()).contains(token); + soft.assertThat(r.paginationRequested()).isTrue(); + soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(456)); + } + + static Stream testApiRoundTrip() { + return Stream.of( + EntityIdToken.fromEntityId(123), + EntityIdToken.fromEntityId(456), + ImmutableDummyTestToken.builder().s("str").i(42).build(), + ImmutableDummyTestToken.builder().i(42).build(), + ImmutableDummyTestToken.builder().build()); + } +} diff --git a/polaris-core/src/test/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType b/polaris-core/src/test/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType new file mode 100644 index 0000000000..26778107e5 --- /dev/null +++ b/polaris-core/src/test/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.core.persistence.pagination.DummyTestToken$DummyTestTokenType diff --git a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 2ef683b5d3..b7f67fb274 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -116,6 +116,7 @@ import org.apache.polaris.core.persistence.cache.InMemoryEntityCache; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; +import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; @@ -210,6 +211,8 @@ public Map getConfigOverrides() { "test", "polaris.readiness.ignore-severe-issues", "true", + "LIST_PAGINATION_ENABLED", + "true", "polaris.features.\"ALLOW_TABLE_LOCATION_OVERLAP\"", "true"); } @@ -2326,4 +2329,131 @@ public void testEventsAreEmitted() { Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld); Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew); } + + private static PageToken nextRequest(Page previousPage) { + return PageToken.build(previousPage.encodedResponseToken(), null); + } + + @Test + public void testPaginatedListTables() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + + catalog.createNamespace(NS); + + for (int i = 0; i < 5; i++) { + catalog.buildTable(TableIdentifier.of(NS, "pagination_table_" + i), SCHEMA).create(); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listTables(NS)).isNotNull().hasSize(5); + + // List with a limit: + Page firstListResult = catalog.listTables(NS, PageToken.fromLimit(2)); + Assertions.assertThat(firstListResult.items().size()).isEqualTo(2); + Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + Page secondListResult = catalog.listTables(NS, nextRequest(firstListResult)); + Assertions.assertThat(secondListResult.items().size()).isEqualTo(2); + Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the final token: + Page finalListResult = catalog.listTables(NS, nextRequest(secondListResult)); + Assertions.assertThat(finalListResult.items().size()).isEqualTo(1); + Assertions.assertThat(finalListResult.encodedResponseToken()).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropTable(TableIdentifier.of(NS, "pagination_table_" + i)); + } + } + } + + @Test + public void testPaginatedListViews() { + Assumptions.assumeTrue( + requiresNamespaceCreate(), + "Only applicable if namespaces must be created before adding children"); + + catalog.createNamespace(NS); + + for (int i = 0; i < 5; i++) { + catalog + .buildView(TableIdentifier.of(NS, "pagination_view_" + i)) + .withQuery("a_" + i, "SELECT 1 id") + .withSchema(SCHEMA) + .withDefaultNamespace(NS) + .create(); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listViews(NS)).isNotNull().hasSize(5); + + // List with a limit: + Page firstListResult = catalog.listViews(NS, PageToken.fromLimit(2)); + Assertions.assertThat(firstListResult.items().size()).isEqualTo(2); + Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + Page secondListResult = catalog.listViews(NS, nextRequest(firstListResult)); + Assertions.assertThat(secondListResult.items().size()).isEqualTo(2); + Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the final token: + Page finalListResult = catalog.listViews(NS, nextRequest(secondListResult)); + Assertions.assertThat(finalListResult.items().size()).isEqualTo(1); + Assertions.assertThat(finalListResult.encodedResponseToken()).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropTable(TableIdentifier.of(NS, "pagination_view_" + i)); + } + } + } + + @Test + public void testPaginatedListNamespaces() { + for (int i = 0; i < 5; i++) { + catalog.createNamespace(Namespace.of("pagination_namespace_" + i)); + } + + try { + // List without pagination + Assertions.assertThat(catalog.listNamespaces()).isNotNull().hasSize(5); + + // List with a limit: + Page firstListResult = catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(2)); + Assertions.assertThat(firstListResult.items().size()).isEqualTo(2); + Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the previously obtained token: + Page secondListResult = + catalog.listNamespaces(Namespace.empty(), nextRequest(firstListResult)); + Assertions.assertThat(secondListResult.items().size()).isEqualTo(2); + Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty(); + + // List using the final token: + Page finalListResult = + catalog.listNamespaces(Namespace.empty(), nextRequest(secondListResult)); + Assertions.assertThat(finalListResult.items().size()).isEqualTo(1); + Assertions.assertThat(finalListResult.encodedResponseToken()).isNull(); + + // List with page size matching the amount of data, no more pages + Page firstExactListResult = + catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(5)); + Assertions.assertThat(firstExactListResult.items().size()).isEqualTo(5); + Assertions.assertThat(firstExactListResult.encodedResponseToken()).isNull(); + + // List with huge page size: + Page bigListResult = catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(9999)); + Assertions.assertThat(bigListResult.items().size()).isEqualTo(5); + Assertions.assertThat(bigListResult.encodedResponseToken()).isNull(); + } finally { + for (int i = 0; i < 5; i++) { + catalog.dropNamespace(Namespace.of("pagination_namespace_" + i)); + } + } + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 1a8e769008..031c3882f1 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -458,14 +458,10 @@ public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) { @Override public List listTables(Namespace namespace) { - return listTables(namespace, PageToken.readEverything()).items; + return listTables(namespace, PageToken.readEverything()).items(); } - public Page listTables(Namespace namespace, String pageToken, Integer pageSize) { - return listTables(namespace, buildPageToken(pageToken, pageSize)); - } - - private Page listTables(Namespace namespace, PageToken pageToken) { + public Page listTables(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list tables for namespace. Namespace does not exist: '%s'", namespace); @@ -778,14 +774,10 @@ public List listNamespaces() { @Override public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { - return listNamespaces(namespace, PageToken.readEverything()).items; - } - - public Page listNamespaces(Namespace namespace, String pageToken, Integer pageSize) { - return listNamespaces(namespace, buildPageToken(pageToken, pageSize)); + return listNamespaces(namespace, PageToken.readEverything()).items(); } - private Page listNamespaces(Namespace namespace, PageToken pageToken) + public Page listNamespaces(Namespace namespace, PageToken pageToken) throws NoSuchNamespaceException { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); if (resolvedEntities == null) { @@ -801,13 +793,12 @@ private Page listNamespaces(Namespace namespace, PageToken pageToken) PolarisEntityType.NAMESPACE, PolarisEntitySubType.NULL_SUBTYPE, pageToken); - List entities = - PolarisEntity.toNameAndIdList(listResult.getEntities()); - List namespaces = PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities); return listResult - .getPageToken() - .map(token -> new Page<>(token, namespaces)) - .orElseGet(() -> Page.fromItems(namespaces)); + .getPage() + .map( + record -> + PolarisCatalogHelpers.nameAndIdToNamespace( + catalogPath, new PolarisEntity.NameAndId(record.getName(), record.getId()))); } @Override @@ -819,14 +810,10 @@ public void close() throws IOException { @Override public List listViews(Namespace namespace) { - return listViews(namespace, PageToken.readEverything()).items; + return listViews(namespace, PageToken.readEverything()).items(); } - public Page listViews(Namespace namespace, String pageToken, Integer pageSize) { - return listViews(namespace, buildPageToken(pageToken, pageSize)); - } - - private Page listViews(Namespace namespace, PageToken pageToken) { + public Page listViews(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list views for namespace. Namespace does not exist: '%s'", namespace); @@ -2596,15 +2583,11 @@ private Page listTableLike( PolarisEntityType.TABLE_LIKE, subType, pageToken); - List entities = - PolarisEntity.toNameAndIdList(listResult.getEntities()); - List identifiers = - PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath, entities); + Namespace parentNamespace = PolarisCatalogHelpers.parentNamespace(catalogPath); return listResult - .getPageToken() - .map(token -> new Page<>(token, identifiers)) - .orElseGet(() -> Page.fromItems(identifiers)); + .getPage() + .map(record -> TableIdentifier.of(parentNamespace, record.getName())); } /** @@ -2642,18 +2625,4 @@ private int getMaxMetadataRefreshRetries() { .getRealmConfig() .getConfig(FeatureConfiguration.MAX_METADATA_REFRESH_RETRIES); } - - /** Build a {@link PageToken} from a string and page size. */ - private PageToken buildPageToken(@Nullable String tokenString, @Nullable Integer pageSize) { - - boolean paginationEnabled = - callContext - .getRealmConfig() - .getConfig(FeatureConfiguration.LIST_PAGINATION_ENABLED, catalogEntity); - if (!paginationEnabled) { - return PageToken.readEverything(); - } else { - return PageToken.build(tokenString, pageSize); - } - } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index c0b0b3259c..d8ceea08da 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -91,6 +91,7 @@ import org.apache.polaris.core.persistence.dao.entity.EntitiesResult; import org.apache.polaris.core.persistence.dao.entity.EntityWithPath; import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageActions; @@ -183,10 +184,11 @@ public ListNamespacesResponse listNamespaces( authorizeBasicNamespaceOperationOrThrow(op, parent); if (baseCatalog instanceof IcebergCatalog polarisCatalog) { - Page results = polarisCatalog.listNamespaces(parent, pageToken, pageSize); + PageToken pageRequest = PageToken.build(pageToken, pageSize); + Page results = polarisCatalog.listNamespaces(parent, pageRequest); return ListNamespacesResponse.builder() - .addAll(results.items) - .nextPageToken(results.pageToken.toTokenString()) + .addAll(results.items()) + .nextPageToken(results.encodedResponseToken()) .build(); } else { return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent, pageToken, pageSize); @@ -343,10 +345,11 @@ public ListTablesResponse listTables(Namespace namespace, String pageToken, Inte authorizeBasicNamespaceOperationOrThrow(op, namespace); if (baseCatalog instanceof IcebergCatalog polarisCatalog) { - Page results = polarisCatalog.listTables(namespace, pageToken, pageSize); + PageToken pageRequest = PageToken.build(pageToken, pageSize); + Page results = polarisCatalog.listTables(namespace, pageRequest); return ListTablesResponse.builder() - .addAll(results.items) - .nextPageToken(results.pageToken.toTokenString()) + .addAll(results.items()) + .nextPageToken(results.encodedResponseToken()) .build(); } else { return catalogHandlerUtils.listTables(baseCatalog, namespace, pageToken, pageSize); @@ -1005,10 +1008,11 @@ public ListTablesResponse listViews(Namespace namespace, String pageToken, Integ authorizeBasicNamespaceOperationOrThrow(op, namespace); if (baseCatalog instanceof IcebergCatalog polarisCatalog) { - Page results = polarisCatalog.listViews(namespace, pageToken, pageSize); + PageToken pageRequest = PageToken.build(pageToken, pageSize); + Page results = polarisCatalog.listViews(namespace, pageRequest); return ListTablesResponse.builder() - .addAll(results.items) - .nextPageToken(results.pageToken.toTokenString()) + .addAll(results.items()) + .nextPageToken(results.encodedResponseToken()) .build(); } else if (baseCatalog instanceof ViewCatalog viewCatalog) { return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken, pageSize); diff --git a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java deleted file mode 100644 index 97e52fb842..0000000000 --- a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.persistence.pagination; - -import org.apache.polaris.core.persistence.pagination.DonePageToken; -import org.apache.polaris.core.persistence.pagination.HasPageSize; -import org.apache.polaris.core.persistence.pagination.PageToken; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PageTokenTest { - private static final Logger LOGGER = LoggerFactory.getLogger(PageTokenTest.class); - - @Test - void testDoneToken() { - Assertions.assertThat(new DonePageToken()).doesNotReturn(null, PageToken::toString); - Assertions.assertThat(new DonePageToken()).returns(null, PageToken::toTokenString); - Assertions.assertThat(new DonePageToken()).isEqualTo(new DonePageToken()); - Assertions.assertThat(new DonePageToken().hashCode()).isEqualTo(new DonePageToken().hashCode()); - } - - @Test - void testReadEverythingPageToken() { - PageToken token = PageToken.readEverything(); - - Assertions.assertThat(token.toString()).isNotNull(); - Assertions.assertThat(token.toTokenString()).isNotNull(); - Assertions.assertThat(token).isNotInstanceOf(HasPageSize.class); - - Assertions.assertThat(PageToken.readEverything()).isEqualTo(PageToken.readEverything()); - } -}