Skip to content

Commit 6cdb73c

Browse files
dimas-bsnazy
authored andcommitted
Extensible pagination token implementation
Based on #1838, following up on #1555 * Allows multiple implementations of `Token` referencing the "next page", encapsulated in `PageToken`. No changes to `polaris-core` needed to add custom `Token` implementations. * Extensible to (later) support (cryptographic) signatures to prevent tampered page-token * Refactor pagination code to delineate API-level page tokens and internal "pointers to data" * Requests deal with the "previous" token, user-provided page size (optional) and the previous request's page size. * Concentrate the logic of combining page size requests and previous tokens in `PageTokenUtil` * `PageToken` subclasses are no longer necessary. * Serialzation of `PageToken` uses Jackson serialization (smile format) Since no (metastore level) implementation handling pagination existed before, no backwards compatibility is needed.
1 parent 35cc9b6 commit 6cdb73c

File tree

32 files changed

+1329
-510
lines changed

32 files changed

+1329
-510
lines changed

integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,24 @@ public List<Namespace> listNamespaces(String catalog, Namespace parent) {
101101
}
102102
}
103103

104+
public ListNamespacesResponse listNamespaces(
105+
String catalog, Namespace parent, String pageToken, String pageSize) {
106+
Map<String, String> queryParams = new HashMap<>();
107+
if (!parent.isEmpty()) {
108+
// TODO change this for Iceberg 1.7.2:
109+
// queryParams.put("parent", RESTUtil.encodeNamespace(parent));
110+
queryParams.put("parent", Joiner.on('\u001f').join(parent.levels()));
111+
}
112+
queryParams.put("pageToken", pageToken);
113+
queryParams.put("pageSize", pageSize);
114+
try (Response response =
115+
request("v1/{cat}/namespaces", Map.of("cat", catalog), queryParams).get()) {
116+
assertThat(response.getStatus()).isEqualTo(OK.getStatusCode());
117+
ListNamespacesResponse res = response.readEntity(ListNamespacesResponse.class);
118+
return res;
119+
}
120+
}
121+
104122
public List<Namespace> listAllNamespacesChildFirst(String catalog) {
105123
List<Namespace> result = new ArrayList<>();
106124
for (int idx = -1; idx < result.size(); idx++) {
@@ -142,6 +160,20 @@ public List<TableIdentifier> listTables(String catalog, Namespace namespace) {
142160
}
143161
}
144162

163+
public ListTablesResponse listTables(
164+
String catalog, Namespace namespace, String pageToken, String pageSize) {
165+
String ns = RESTUtil.encodeNamespace(namespace);
166+
Map<String, String> queryParams = new HashMap<>();
167+
queryParams.put("pageToken", pageToken);
168+
queryParams.put("pageSize", pageSize);
169+
try (Response res =
170+
request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat", catalog), queryParams)
171+
.get()) {
172+
assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
173+
return res.readEntity(ListTablesResponse.class);
174+
}
175+
}
176+
145177
public void dropTable(String catalog, TableIdentifier id) {
146178
String ns = RESTUtil.encodeNamespace(id.namespace());
147179
try (Response res =

integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@
6969
import org.apache.iceberg.rest.RESTUtil;
7070
import org.apache.iceberg.rest.requests.CreateTableRequest;
7171
import org.apache.iceberg.rest.responses.ErrorResponse;
72+
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
73+
import org.apache.iceberg.rest.responses.ListTablesResponse;
7274
import org.apache.iceberg.rest.responses.LoadTableResponse;
7375
import org.apache.iceberg.types.Types;
7476
import org.apache.polaris.core.admin.model.Catalog;
@@ -161,7 +163,8 @@ public abstract class PolarisRestCatalogIntegrationBase extends CatalogTests<RES
161163

162164
private static final String[] DEFAULT_CATALOG_PROPERTIES = {
163165
"polaris.config.allow.unstructured.table.location", "true",
164-
"polaris.config.allow.external.table.location", "true"
166+
"polaris.config.allow.external.table.location", "true",
167+
"polaris.config.list-pagination-enabled", "true"
165168
};
166169

167170
@Retention(RetentionPolicy.RUNTIME)
@@ -2023,4 +2026,72 @@ public void testETagChangeAfterDMLOperations() {
20232026
assertThat(currentETag).isEqualTo(afterDMLETag); // Should match post-DML ETag
20242027
}
20252028
}
2029+
2030+
@Test
2031+
public void testPaginatedListNamespaces() {
2032+
String prefix = "testPaginatedListNamespaces";
2033+
for (int i = 0; i < 20; i++) {
2034+
Namespace namespace = Namespace.of(prefix + i);
2035+
restCatalog.createNamespace(namespace);
2036+
}
2037+
2038+
try {
2039+
Assertions.assertThat(catalogApi.listNamespaces(currentCatalogName, Namespace.empty()))
2040+
.hasSize(20);
2041+
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
2042+
int total = 0;
2043+
String pageToken = null;
2044+
do {
2045+
ListNamespacesResponse response =
2046+
catalogApi.listNamespaces(
2047+
currentCatalogName, Namespace.empty(), pageToken, String.valueOf(pageSize));
2048+
Assertions.assertThat(response.namespaces().size()).isLessThanOrEqualTo(pageSize);
2049+
total += response.namespaces().size();
2050+
pageToken = response.nextPageToken();
2051+
} while (pageToken != null);
2052+
Assertions.assertThat(total)
2053+
.as("Total paginated results for pageSize = " + pageSize)
2054+
.isEqualTo(20);
2055+
}
2056+
} finally {
2057+
for (int i = 0; i < 20; i++) {
2058+
Namespace namespace = Namespace.of(prefix + i);
2059+
restCatalog.dropNamespace(namespace);
2060+
}
2061+
}
2062+
}
2063+
2064+
@Test
2065+
public void testPaginatedListTables() {
2066+
String prefix = "testPaginatedListTables";
2067+
Namespace namespace = Namespace.of(prefix);
2068+
restCatalog.createNamespace(namespace);
2069+
for (int i = 0; i < 20; i++) {
2070+
restCatalog.createTable(TableIdentifier.of(namespace, prefix + i), SCHEMA);
2071+
}
2072+
2073+
try {
2074+
Assertions.assertThat(catalogApi.listTables(currentCatalogName, namespace)).hasSize(20);
2075+
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
2076+
int total = 0;
2077+
String pageToken = null;
2078+
do {
2079+
ListTablesResponse response =
2080+
catalogApi.listTables(
2081+
currentCatalogName, namespace, pageToken, String.valueOf(pageSize));
2082+
Assertions.assertThat(response.identifiers().size()).isLessThanOrEqualTo(pageSize);
2083+
total += response.identifiers().size();
2084+
pageToken = response.nextPageToken();
2085+
} while (pageToken != null);
2086+
Assertions.assertThat(total)
2087+
.as("Total paginated results for pageSize = " + pageSize)
2088+
.isEqualTo(20);
2089+
}
2090+
} finally {
2091+
for (int i = 0; i < 20; i++) {
2092+
restCatalog.dropTable(TableIdentifier.of(namespace, prefix + i));
2093+
}
2094+
restCatalog.dropNamespace(namespace);
2095+
}
2096+
}
20262097
}

persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
5757
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
5858
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
59-
import org.apache.polaris.core.persistence.pagination.HasPageSize;
59+
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
6060
import org.apache.polaris.core.persistence.pagination.Page;
6161
import org.apache.polaris.core.persistence.pagination.PageToken;
6262
import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
@@ -480,11 +480,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
480480
.map(ModelEntity::toEntity)
481481
.filter(entityFilter);
482482

483-
if (pageToken instanceof HasPageSize hasPageSize) {
484-
data = data.limit(hasPageSize.getPageSize());
485-
}
486-
487-
return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
483+
return Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity);
488484
}
489485

490486
/** {@inheritDoc} */

persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.polaris.core.entity.PolarisEntityType;
3636
import org.apache.polaris.core.entity.PolarisGrantRecord;
3737
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
38+
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
3839
import org.apache.polaris.core.persistence.pagination.PageToken;
3940
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
4041
import org.apache.polaris.core.policy.PolicyEntity;
@@ -294,7 +295,17 @@ List<ModelEntity> lookupFullEntitiesActive(
294295

295296
// Currently check against ENTITIES not joining with ENTITIES_ACTIVE
296297
String hql =
297-
"SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode";
298+
"SELECT m from ModelEntity m where"
299+
+ " m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode";
300+
301+
var entityIdToken = pageToken.valueAs(EntityIdToken.class);
302+
if (entityIdToken.isPresent()) {
303+
hql += " and m.id > :tokenId";
304+
}
305+
306+
if (pageToken.paginationRequested()) {
307+
hql += " order by m.id asc";
308+
}
298309

299310
TypedQuery<ModelEntity> query =
300311
session
@@ -303,6 +314,11 @@ List<ModelEntity> lookupFullEntitiesActive(
303314
.setParameter("parentId", parentId)
304315
.setParameter("typeCode", entityType.getCode());
305316

317+
if (entityIdToken.isPresent()) {
318+
long tokenId = entityIdToken.get().entityId();
319+
query = query.setParameter("tokenId", tokenId);
320+
}
321+
306322
return query.getResultList();
307323
}
308324

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.Optional;
35+
import java.util.concurrent.atomic.AtomicReference;
3536
import java.util.function.Function;
3637
import java.util.function.Predicate;
3738
import java.util.stream.Collectors;
@@ -53,7 +54,7 @@
5354
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
5455
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
5556
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
56-
import org.apache.polaris.core.persistence.pagination.HasPageSize;
57+
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
5758
import org.apache.polaris.core.persistence.pagination.Page;
5859
import org.apache.polaris.core.persistence.pagination.PageToken;
5960
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
@@ -459,7 +460,7 @@ public <T> Page<T> listEntities(
459460
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
460461
@Nonnull Function<PolarisBaseEntity, T> transformer,
461462
@Nonnull PageToken pageToken) {
462-
Map<String, Object> params =
463+
Map<String, Object> whereEquals =
463464
Map.of(
464465
"catalog_id",
465466
catalogId,
@@ -469,29 +470,41 @@ public <T> Page<T> listEntities(
469470
entityType.getCode(),
470471
"realm_id",
471472
realmId);
473+
Map<String, Object> whereGreater;
472474

473475
// Limit can't be pushed down, due to client side filtering
474476
// absence of transaction.
477+
String orderByColumnName = null;
478+
if (pageToken.paginationRequested()) {
479+
orderByColumnName = ModelEntity.ID_COLUMN;
480+
whereGreater =
481+
pageToken
482+
.valueAs(EntityIdToken.class)
483+
.map(
484+
entityIdToken ->
485+
Map.<String, Object>of(ModelEntity.ID_COLUMN, entityIdToken.entityId()))
486+
.orElse(Map.of());
487+
} else {
488+
whereGreater = Map.of();
489+
}
490+
475491
try {
476492
PreparedQuery query =
477493
QueryGenerator.generateSelectQuery(
478-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params);
479-
List<PolarisBaseEntity> results = new ArrayList<>();
494+
ModelEntity.ALL_COLUMNS,
495+
ModelEntity.TABLE_NAME,
496+
whereEquals,
497+
whereGreater,
498+
orderByColumnName);
499+
AtomicReference<Page<T>> results = new AtomicReference<>();
480500
datasourceOperations.executeSelectOverStream(
481501
query,
482502
new ModelEntity(),
483503
stream -> {
484504
var data = stream.filter(entityFilter);
485-
if (pageToken instanceof HasPageSize hasPageSize) {
486-
data = data.limit(hasPageSize.getPageSize());
487-
}
488-
data.forEach(results::add);
505+
results.set(Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity));
489506
});
490-
List<T> resultsOrEmpty =
491-
results == null
492-
? Collections.emptyList()
493-
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
494-
return Page.fromItems(resultsOrEmpty);
507+
return results.get();
495508
} catch (SQLException e) {
496509
throw new RuntimeException(
497510
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import jakarta.annotation.Nonnull;
23+
import jakarta.annotation.Nullable;
2324
import java.util.ArrayList;
2425
import java.util.Arrays;
2526
import java.util.Collections;
@@ -59,8 +60,27 @@ public static PreparedQuery generateSelectQuery(
5960
@Nonnull List<String> projections,
6061
@Nonnull String tableName,
6162
@Nonnull Map<String, Object> whereClause) {
62-
QueryFragment where = generateWhereClause(new HashSet<>(projections), whereClause);
63-
PreparedQuery query = generateSelectQuery(projections, tableName, where.sql());
63+
return generateSelectQuery(projections, tableName, whereClause, Map.of(), null);
64+
}
65+
66+
/**
67+
* Generates a SELECT query with projection and filtering.
68+
*
69+
* @param projections List of columns to retrieve.
70+
* @param tableName Target table name.
71+
* @param whereEquals Column-value pairs used in WHERE filtering.
72+
* @return A parameterized SELECT query.
73+
* @throws IllegalArgumentException if any whereClause column isn't in projections.
74+
*/
75+
public static PreparedQuery generateSelectQuery(
76+
@Nonnull List<String> projections,
77+
@Nonnull String tableName,
78+
@Nonnull Map<String, Object> whereEquals,
79+
@Nonnull Map<String, Object> whereGreater,
80+
@Nullable String orderByColumn) {
81+
QueryFragment where =
82+
generateWhereClause(new HashSet<>(projections), whereEquals, whereGreater);
83+
PreparedQuery query = generateSelectQuery(projections, tableName, where.sql(), orderByColumn);
6484
return new PreparedQuery(query.sql(), where.parameters());
6585
}
6686

@@ -108,7 +128,8 @@ public static PreparedQuery generateSelectQueryWithEntityIds(
108128
params.add(realmId);
109129
String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND realm_id = ?";
110130
return new PreparedQuery(
111-
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where).sql(), params);
131+
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where, null).sql(),
132+
params);
112133
}
113134

114135
/**
@@ -157,7 +178,7 @@ public static PreparedQuery generateUpdateQuery(
157178
@Nonnull List<Object> values,
158179
@Nonnull Map<String, Object> whereClause) {
159180
List<Object> bindingParams = new ArrayList<>(values);
160-
QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause);
181+
QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause, Map.of());
161182
String setClause = allColumns.stream().map(c -> c + " = ?").collect(Collectors.joining(", "));
162183
String sql =
163184
"UPDATE " + getFullyQualifiedTableName(tableName) + " SET " + setClause + where.sql();
@@ -177,34 +198,49 @@ public static PreparedQuery generateDeleteQuery(
177198
@Nonnull List<String> tableColumns,
178199
@Nonnull String tableName,
179200
@Nonnull Map<String, Object> whereClause) {
180-
QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause);
201+
QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause, Map.of());
181202
return new PreparedQuery(
182203
"DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), where.parameters());
183204
}
184205

185206
private static PreparedQuery generateSelectQuery(
186-
@Nonnull List<String> columnNames, @Nonnull String tableName, @Nonnull String filter) {
207+
@Nonnull List<String> columnNames,
208+
@Nonnull String tableName,
209+
@Nonnull String filter,
210+
@Nullable String orderByColumn) {
187211
String sql =
188212
"SELECT "
189213
+ String.join(", ", columnNames)
190214
+ " FROM "
191215
+ getFullyQualifiedTableName(tableName)
192216
+ filter;
217+
if (orderByColumn != null) {
218+
sql += " ORDER BY " + orderByColumn + " ASC";
219+
}
193220
return new PreparedQuery(sql, Collections.emptyList());
194221
}
195222

196223
@VisibleForTesting
197224
static QueryFragment generateWhereClause(
198-
@Nonnull Set<String> tableColumns, @Nonnull Map<String, Object> whereClause) {
225+
@Nonnull Set<String> tableColumns,
226+
@Nonnull Map<String, Object> whereEquals,
227+
@Nonnull Map<String, Object> whereGreater) {
199228
List<String> conditions = new ArrayList<>();
200229
List<Object> parameters = new ArrayList<>();
201-
for (Map.Entry<String, Object> entry : whereClause.entrySet()) {
230+
for (Map.Entry<String, Object> entry : whereEquals.entrySet()) {
202231
if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) {
203232
throw new IllegalArgumentException("Invalid query column: " + entry.getKey());
204233
}
205234
conditions.add(entry.getKey() + " = ?");
206235
parameters.add(entry.getValue());
207236
}
237+
for (Map.Entry<String, Object> entry : whereGreater.entrySet()) {
238+
if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) {
239+
throw new IllegalArgumentException("Invalid query column: " + entry.getKey());
240+
}
241+
conditions.add(entry.getKey() + " > ?");
242+
parameters.add(entry.getValue());
243+
}
208244
String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
209245
return new QueryFragment(clause, parameters);
210246
}
@@ -258,7 +294,7 @@ public static PreparedQuery generateOverlapQuery(
258294

259295
QueryFragment where = new QueryFragment(clause, finalParams);
260296
PreparedQuery query =
261-
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql());
297+
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql(), null);
262298
return new PreparedQuery(query.sql(), where.parameters());
263299
}
264300

0 commit comments

Comments
 (0)