Skip to content

Extensible pagination token implementation #1938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@ public List<Namespace> listNamespaces(String catalog, Namespace parent) {
}
}

public ListNamespacesResponse listNamespaces(
String catalog, Namespace parent, String pageToken, String pageSize) {
Map<String, String> queryParams = new HashMap<>();
if (!parent.isEmpty()) {
// TODO change this for Iceberg 1.7.2:
// queryParams.put("parent", RESTUtil.encodeNamespace(parent));
queryParams.put("parent", Joiner.on('\u001f').join(parent.levels()));
}
queryParams.put("pageToken", pageToken);
queryParams.put("pageSize", pageSize);
try (Response response =
request("v1/{cat}/namespaces", Map.of("cat", catalog), queryParams).get()) {
assertThat(response.getStatus()).isEqualTo(OK.getStatusCode());
ListNamespacesResponse res = response.readEntity(ListNamespacesResponse.class);
return res;
}
}

public List<Namespace> listAllNamespacesChildFirst(String catalog) {
List<Namespace> result = new ArrayList<>();
for (int idx = -1; idx < result.size(); idx++) {
Expand Down Expand Up @@ -142,6 +160,20 @@ public List<TableIdentifier> listTables(String catalog, Namespace namespace) {
}
}

public ListTablesResponse listTables(
String catalog, Namespace namespace, String pageToken, String pageSize) {
String ns = RESTUtil.encodeNamespace(namespace);
Map<String, String> queryParams = new HashMap<>();
queryParams.put("pageToken", pageToken);
queryParams.put("pageSize", pageSize);
try (Response res =
request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat", catalog), queryParams)
.get()) {
assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
return res.readEntity(ListTablesResponse.class);
}
}

public void dropTable(String catalog, TableIdentifier id) {
String ns = RESTUtil.encodeNamespace(id.namespace());
try (Response res =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.types.Types;
import org.apache.polaris.core.admin.model.Catalog;
Expand Down Expand Up @@ -161,7 +163,8 @@ public abstract class PolarisRestCatalogIntegrationBase extends CatalogTests<RES

private static final String[] DEFAULT_CATALOG_PROPERTIES = {
"polaris.config.allow.unstructured.table.location", "true",
"polaris.config.allow.external.table.location", "true"
"polaris.config.allow.external.table.location", "true",
"polaris.config.list-pagination-enabled", "true"
};

@Retention(RetentionPolicy.RUNTIME)
Expand Down Expand Up @@ -2023,4 +2026,72 @@ public void testETagChangeAfterDMLOperations() {
assertThat(currentETag).isEqualTo(afterDMLETag); // Should match post-DML ETag
}
}

@Test
public void testPaginatedListNamespaces() {
String prefix = "testPaginatedListNamespaces";
for (int i = 0; i < 20; i++) {
Namespace namespace = Namespace.of(prefix + i);
restCatalog.createNamespace(namespace);
}

try {
Assertions.assertThat(catalogApi.listNamespaces(currentCatalogName, Namespace.empty()))
.hasSize(20);
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
int total = 0;
String pageToken = null;
do {
ListNamespacesResponse response =
catalogApi.listNamespaces(
currentCatalogName, Namespace.empty(), pageToken, String.valueOf(pageSize));
Assertions.assertThat(response.namespaces().size()).isLessThanOrEqualTo(pageSize);
total += response.namespaces().size();
pageToken = response.nextPageToken();
} while (pageToken != null);
Assertions.assertThat(total)
.as("Total paginated results for pageSize = " + pageSize)
.isEqualTo(20);
}
} finally {
for (int i = 0; i < 20; i++) {
Namespace namespace = Namespace.of(prefix + i);
restCatalog.dropNamespace(namespace);
}
}
}

@Test
public void testPaginatedListTables() {
String prefix = "testPaginatedListTables";
Namespace namespace = Namespace.of(prefix);
restCatalog.createNamespace(namespace);
for (int i = 0; i < 20; i++) {
restCatalog.createTable(TableIdentifier.of(namespace, prefix + i), SCHEMA);
}

try {
Assertions.assertThat(catalogApi.listTables(currentCatalogName, namespace)).hasSize(20);
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
int total = 0;
String pageToken = null;
do {
ListTablesResponse response =
catalogApi.listTables(
currentCatalogName, namespace, pageToken, String.valueOf(pageSize));
Assertions.assertThat(response.identifiers().size()).isLessThanOrEqualTo(pageSize);
total += response.identifiers().size();
pageToken = response.nextPageToken();
} while (pageToken != null);
Assertions.assertThat(total)
.as("Total paginated results for pageSize = " + pageSize)
.isEqualTo(20);
}
} finally {
for (int i = 0; i < 20; i++) {
restCatalog.dropTable(TableIdentifier.of(namespace, prefix + i));
}
restCatalog.dropNamespace(namespace);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
import org.apache.polaris.core.persistence.pagination.HasPageSize;
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
Expand Down Expand Up @@ -480,11 +480,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
.map(ModelEntity::toEntity)
.filter(entityFilter);

if (pageToken instanceof HasPageSize hasPageSize) {
data = data.limit(hasPageSize.getPageSize());
}

return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
return Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyEntity;
Expand Down Expand Up @@ -294,7 +295,17 @@ List<ModelEntity> lookupFullEntitiesActive(

// Currently check against ENTITIES not joining with ENTITIES_ACTIVE
String hql =
"SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode";
"SELECT m from ModelEntity m where"
+ " m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode";

var entityIdToken = pageToken.valueAs(EntityIdToken.class);
if (entityIdToken.isPresent()) {
hql += " and m.id > :tokenId";
}

if (pageToken.paginationRequested()) {
hql += " order by m.id asc";
}

TypedQuery<ModelEntity> query =
session
Expand All @@ -303,6 +314,11 @@ List<ModelEntity> lookupFullEntitiesActive(
.setParameter("parentId", parentId)
.setParameter("typeCode", entityType.getCode());

if (entityIdToken.isPresent()) {
long tokenId = entityIdToken.get().entityId();
query = query.setParameter("tokenId", tokenId);
}

return query.getResultList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -53,7 +54,7 @@
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
import org.apache.polaris.core.persistence.pagination.HasPageSize;
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
Expand Down Expand Up @@ -459,7 +460,7 @@ public <T> Page<T> listEntities(
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull Function<PolarisBaseEntity, T> transformer,
@Nonnull PageToken pageToken) {
Map<String, Object> params =
Map<String, Object> whereEquals =
Map.of(
"catalog_id",
catalogId,
Expand All @@ -469,29 +470,41 @@ public <T> Page<T> listEntities(
entityType.getCode(),
"realm_id",
realmId);
Map<String, Object> whereGreater;

// Limit can't be pushed down, due to client side filtering
// absence of transaction.
String orderByColumnName = null;
if (pageToken.paginationRequested()) {
orderByColumnName = ModelEntity.ID_COLUMN;
whereGreater =
pageToken
.valueAs(EntityIdToken.class)
.map(
entityIdToken ->
Map.<String, Object>of(ModelEntity.ID_COLUMN, entityIdToken.entityId()))
.orElse(Map.of());
} else {
whereGreater = Map.of();
}

try {
PreparedQuery query =
QueryGenerator.generateSelectQuery(
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params);
List<PolarisBaseEntity> results = new ArrayList<>();
ModelEntity.ALL_COLUMNS,
ModelEntity.TABLE_NAME,
whereEquals,
whereGreater,
orderByColumnName);
AtomicReference<Page<T>> results = new AtomicReference<>();
datasourceOperations.executeSelectOverStream(
query,
new ModelEntity(),
stream -> {
var data = stream.filter(entityFilter);
if (pageToken instanceof HasPageSize hasPageSize) {
data = data.limit(hasPageSize.getPageSize());
}
data.forEach(results::add);
results.set(Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity));
});
List<T> resultsOrEmpty =
results == null
? Collections.emptyList()
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
return Page.fromItems(resultsOrEmpty);
return results.get();
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -59,8 +60,27 @@ public static PreparedQuery generateSelectQuery(
@Nonnull List<String> projections,
@Nonnull String tableName,
@Nonnull Map<String, Object> whereClause) {
QueryFragment where = generateWhereClause(new HashSet<>(projections), whereClause);
PreparedQuery query = generateSelectQuery(projections, tableName, where.sql());
return generateSelectQuery(projections, tableName, whereClause, Map.of(), null);
}

/**
* Generates a SELECT query with projection and filtering.
*
* @param projections List of columns to retrieve.
* @param tableName Target table name.
* @param whereEquals Column-value pairs used in WHERE filtering.
* @return A parameterized SELECT query.
* @throws IllegalArgumentException if any whereClause column isn't in projections.
*/
public static PreparedQuery generateSelectQuery(
@Nonnull List<String> projections,
@Nonnull String tableName,
@Nonnull Map<String, Object> whereEquals,
@Nonnull Map<String, Object> whereGreater,
@Nullable String orderByColumn) {
QueryFragment where =
generateWhereClause(new HashSet<>(projections), whereEquals, whereGreater);
PreparedQuery query = generateSelectQuery(projections, tableName, where.sql(), orderByColumn);
return new PreparedQuery(query.sql(), where.parameters());
}

Expand Down Expand Up @@ -108,7 +128,8 @@ public static PreparedQuery generateSelectQueryWithEntityIds(
params.add(realmId);
String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND realm_id = ?";
return new PreparedQuery(
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where).sql(), params);
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where, null).sql(),
params);
}

/**
Expand Down Expand Up @@ -157,7 +178,7 @@ public static PreparedQuery generateUpdateQuery(
@Nonnull List<Object> values,
@Nonnull Map<String, Object> whereClause) {
List<Object> bindingParams = new ArrayList<>(values);
QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause);
QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause, Map.of());
String setClause = allColumns.stream().map(c -> c + " = ?").collect(Collectors.joining(", "));
String sql =
"UPDATE " + getFullyQualifiedTableName(tableName) + " SET " + setClause + where.sql();
Expand All @@ -177,34 +198,49 @@ public static PreparedQuery generateDeleteQuery(
@Nonnull List<String> tableColumns,
@Nonnull String tableName,
@Nonnull Map<String, Object> whereClause) {
QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause);
QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause, Map.of());
return new PreparedQuery(
"DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), where.parameters());
}

private static PreparedQuery generateSelectQuery(
@Nonnull List<String> columnNames, @Nonnull String tableName, @Nonnull String filter) {
@Nonnull List<String> columnNames,
@Nonnull String tableName,
@Nonnull String filter,
@Nullable String orderByColumn) {
String sql =
"SELECT "
+ String.join(", ", columnNames)
+ " FROM "
+ getFullyQualifiedTableName(tableName)
+ filter;
if (orderByColumn != null) {
sql += " ORDER BY " + orderByColumn + " ASC";
}
return new PreparedQuery(sql, Collections.emptyList());
}

@VisibleForTesting
static QueryFragment generateWhereClause(
@Nonnull Set<String> tableColumns, @Nonnull Map<String, Object> whereClause) {
@Nonnull Set<String> tableColumns,
@Nonnull Map<String, Object> whereEquals,
@Nonnull Map<String, Object> whereGreater) {
List<String> conditions = new ArrayList<>();
List<Object> parameters = new ArrayList<>();
for (Map.Entry<String, Object> entry : whereClause.entrySet()) {
for (Map.Entry<String, Object> entry : whereEquals.entrySet()) {
if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) {
throw new IllegalArgumentException("Invalid query column: " + entry.getKey());
}
conditions.add(entry.getKey() + " = ?");
parameters.add(entry.getValue());
}
for (Map.Entry<String, Object> entry : whereGreater.entrySet()) {
if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) {
throw new IllegalArgumentException("Invalid query column: " + entry.getKey());
}
conditions.add(entry.getKey() + " > ?");
parameters.add(entry.getValue());
}
String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
return new QueryFragment(clause, parameters);
}
Expand Down Expand Up @@ -258,7 +294,7 @@ public static PreparedQuery generateOverlapQuery(

QueryFragment where = new QueryFragment(clause, finalParams);
PreparedQuery query =
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql());
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql(), null);
return new PreparedQuery(query.sql(), where.parameters());
}

Expand Down
Loading
Loading