Skip to content

Commit 84e0692

Browse files
mattmilleraiguill
andauthored
feat(assets): cursor-based pagination on GET /api/assets (Comfy-Org#14014)
* spec(assets): add cursor pagination params to GET /api/assets Add 'after' query param and 'next_cursor' response field for keyset pagination. Matches the cloud Go implementation (BE-893) so frontend sees a unified contract across runtimes. Offset/limit remain as a deprecated fallback. * feat(assets): add cursor encode/decode helpers for keyset pagination Port of cloud common/pagination/cursor.go. Wire format is base64url of {"s", "v", "id"} JSON; times are Unix microseconds UTC to match PostgreSQL timestamp precision. Includes a byte-identity fixture pinned against the cloud Go wire format so cross-runtime FE pagination can't silently drift. * feat(assets): thread cursor through schemas, service, and query layer list_assets_page accepts an opaque 'after' cursor and returns next_cursor when more pages are available. The query applies a keyset WHERE clause and a secondary ORDER BY id for deterministic tiebreak. Cursor sort field is validated against the request sort, and a last_access_time sort (OSS-only) falls back to offset/limit. Offset is ignored whenever a cursor is supplied. * feat(assets): wire cursor pagination through GET /api/assets handler Adds integration tests for: full cursor walk, invalid-cursor 400, sort/cursor mismatch 400, cursor-wins-over-offset, absent next_cursor when no more results, and pagination stability across deletes. * fix(assets): address cursor-review verified findings - Mint next_cursor on every cursor-supported sort, not only when 'after' was supplied. A first request (no 'after') previously returned next_cursor=None, leaving cursor mode unreachable from a clean start. - Over-fetch limit+1 so an exactly-full terminal page doesn't mint a spurious cursor pointing at a phantom next page. - Map crafted out-of-range microsecond cursors (OverflowError / OSError in datetime construction) to 400 INVALID_CURSOR instead of leaking 500. - Bump MAX_CURSOR_VALUE_LENGTH 256 -> 512 to match the AssetReference name column max; without this, a long-named asset minted a cursor the same server then refused on the next request. Cross-runtime byte identity with cloud is unaffected because no cloud cursor ever carries a value > 256 (cloud schema doesn't permit it). - Return None from _encode_next_cursor when the boundary row carries a NULL sort value (e.g. an Asset without size_bytes backfilled), instead of silently encoding 0 and mis-positioning the keyset. - Fix schemas_in.py comment so it matches actual handler behavior (last_access_time + 'after' raises 400, does not fall back). - Add AssetsApiError schema + 400 response to GET /api/assets in openapi.yaml so generated clients know the INVALID_CURSOR envelope. - Extend integration coverage: first-page mint, exact-multiple terminal page, cursor walks for created_at/updated_at/size sorts, datetime overflow surfaces as 400 not 500. - Add unit coverage for datetime overflow and 512-char round-trip. * feat(assets): bind cursor to sort order + Go-compat JSON escaping Address three needs-judgment items from the cursor-review judge synthesis: 1. Cursor wire format now includes an "o" key carrying the sort direction ("asc" / "desc") it was minted under. A request that replays the cursor with a flipped `order` parameter is rejected with 400 INVALID_CURSOR instead of silently walking the wrong direction. Legacy cursors without "o" still decode (the binding is best-effort until cloud mirrors the field — follow-up filed separately). 2. JSON serialization now escapes `<`, `>`, `&`, U+2028, U+2029 to mirror Go's default `json.Marshal` behavior. Without this, an asset name containing those characters produced different bytes on Python vs cloud Go. The escaped form is what both runtimes emit. 3. Add direct query-layer tests for the keyset tiebreaker — the secondary ORDER BY id branch was previously unexercised. Two scenarios: all rows share a primary sort value, and mixed ties straddle page boundaries. Both assert no row is dropped or duplicated across the walk. Wire-format note: Python cursors now differ from current cloud cursors by exactly the "o" key. Cloud follow-up will bring the two back into byte alignment. * fix(assets): address bot review comments - Soften offset param prose: it's not deprecated, just not preferred for sequential walks. Random-access UIs (jump-to-page, item count displays) legitimately still want offset, so dropping the 'deprecated' framing rather than promoting it to a machine-readable deprecated:true flag. - Add explicit HTTP status assertions before every json() / next_cursor read in test_list_cursor.py so a failing request surfaces as an HTTP error instead of a confusing KeyError on a 4xx/5xx body. * feat(assets): require cursor o field, drop legacy permissive path Cursor pagination hasn't shipped on either runtime yet — this PR is still draft and cloud's mirror is just behind it — so there are no legacy no-o cursors in the wild. Make o mandatory from day one rather than landing permissive and tightening later. decode_cursor now rejects any payload without o (or with a non-string o) as malformed. CursorPayload.order becomes a required str. Tests that constructed CursorPayload directly now pass order="desc"; test_legacy_cursor_without_order_accepted flips to test_cursor_without_order_rejected. * chore(assets): drop cross-repo prose from cursor comments Strip prose references to sibling Go implementations and external ticket IDs from cursor.py, the cursor tests, the keyset integration tests, asset_management's sort-field comment, and the legacy prompt_id alias comment. Pure docstring/comment scrub — no behavior or wire-format changes. x-runtime: [cloud] field annotations in openapi.yaml are unchanged; those are the spec's structural cross-runtime convention, not internal references. * test(assets): include 'o' in microsecond-boundary cursor payload The boundary test was building a cursor without the required `o` key, so decode failed on the missing-order branch before reaching the µs-overflow path the test is asserting. Both paths return 400 INVALID_CURSOR so the assertion passed for the wrong reason. Add `o` to the payload and matching `order=` to the request so the decode reaches the intended branch. * fix(assets): address ultrareview findings on cursor pagination Six fact-checked findings from the multi-model review pass: - Encoder/decoder length asymmetry: encode_cursor now rejects empty id, oversized id (>128), oversized value (>512), and invalid order tokens symmetrically with decode_cursor. Prevents the same server from minting a cursor it then 400s on the next request (e.g. a filesystem-scanned asset name >512 chars). The bad-order path now raises InvalidCursorError (still subclasses ValueError) so route-layer handling stays uniform. - Raw U+2028/U+2029 in cursor.py source: ripgrep treated those lines as line-terminators, confirming the bytes were the actual separators. Any editor save / autoformat / git tooling that normalizes invisibles would silently break the encoder. Replaced with explicit 
 / 
 Python escape sequences. - set(seen) == set(names) hid ordering regressions: a cursor walk that dropped a row at a page boundary or returned duplicates could pass. Reworked the assertion to (1) reject duplicates, (2) require full coverage, and (3) assert strict positional order for size sort, the only field with a clock-independent ordering. - Flaky time.sleep(0.05) between inserts: Windows CI clock resolution is ~15ms, so back-to-back inserts under load could collide and exercise the tiebreaker instead of the documented path. Removed the sleep and let the strengthened assertion above carry coverage / no-duplicates, with size sort carrying strict order. - Cursor error envelope diverged from the rest of routes.py: cursor 400s emitted {error: {code, message}} while every other 400 in the file emits {error: {code, message, details}} via _build_error_response. Switched to _build_error_response and added the details field to the AssetsApiError schema in openapi.yaml. - "Byte-identity fixtures" only checked substring containment, defeating the test class's stated purpose of pinning the wire format. Switched to exact-bytes equality against an inline expected payload string per fixture, so any whitespace / key-order / escape drift fails loudly. Also dropped Go / json.Marshal references from docstrings — the byte format is the contract, not the runtime that mints it. * fix(assets): cap cursors by encoded wire size, not just char count Char-count guards on value/id can still let multibyte or escape-heavy inputs blow past MAX_ENCODED_CURSOR_LENGTH once UTF-8 + escape expansion + base64url runs. A 512-character name of 'é' (2 bytes UTF-8) or '<' (serializes to the 6-byte '<' escape) passes the char check, mints a ~1500-byte cursor, then 400s when handed back on the next request. Compute the final encoded form and reject it before returning if it exceeds the wire cap. Adds regression tests for both inflation paths. * refactor(assets): extract cursor JSON escaping helper; size wire cap above per-field caps Addresses review feedback on cursor.py: - Extract the inline escape chain into _apply_wire_compatible_json_escapes() with a comment pinning it to the wire format's escape set, so the parity intent is explicit rather than reading as an ad-hoc transform. - Raise MAX_ENCODED_CURSOR_LENGTH to 8192 (comfortably above the ~5.2KB worst-case the per-field caps can produce) and drop the mint-time length guard. Encoder/decoder symmetry now holds by construction: the encoder can't produce a cursor the decode path rejects, so there is no confusing user-visible 'cursor too long' failure at mint time. - Rewrite the two over-wire-cap tests to assert worst-case multibyte and escape-heavy values mint and round-trip, instead of being rejected. * refactor(assets): drop cross-runtime cursor escaping; cursors are opaque The custom JSON escaping of <, >, &, U+2028, and U+2029 existed only to keep the encoded cursor byte-identical with the Cloud implementation of the same payload format. Cursors are opaque tokens, so byte-level compatibility across implementations is not needed — plain json.dumps output is sufficient. Remove the escaping helper and the byte-identity test fixtures that pinned the wire format; keep round-trip coverage for the affected characters. --------- Co-authored-by: guill <jacob.e.segal@gmail.com>
1 parent a76bb43 commit 84e0692

10 files changed

Lines changed: 1112 additions & 17 deletions

File tree

app/assets/api/routes.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
update_asset_metadata,
4040
upload_from_temp_path,
4141
)
42+
from app.assets.services.cursor import InvalidCursorError
4243
from app.assets.services.tagging import list_tag_histogram
4344

4445
ROUTES = web.RouteTableDef()
@@ -174,7 +175,7 @@ def _build_asset_response(result: schemas.AssetDetailResult | schemas.UploadResu
174175
user_metadata=result.ref.user_metadata or {},
175176
metadata=result.ref.system_metadata,
176177
job_id=result.ref.job_id,
177-
prompt_id=result.ref.job_id, # deprecated: mirrors job_id for cloud compat
178+
prompt_id=result.ref.job_id, # deprecated alias of job_id, kept for compatibility
178179
created_at=result.ref.created_at,
179180
updated_at=result.ref.updated_at,
180181
last_access_time=result.ref.last_access_time,
@@ -211,24 +212,37 @@ async def list_assets_route(request: web.Request) -> web.Response:
211212
order_candidate = (q.order or "desc").lower()
212213
order = order_candidate if order_candidate in {"asc", "desc"} else "desc"
213214

214-
result = list_assets_page(
215-
owner_id=USER_MANAGER.get_request_user_id(request),
216-
include_tags=q.include_tags,
217-
exclude_tags=q.exclude_tags,
218-
name_contains=q.name_contains,
219-
metadata_filter=q.metadata_filter,
220-
limit=q.limit,
221-
offset=q.offset,
222-
sort=sort,
223-
order=order,
224-
)
215+
try:
216+
result = list_assets_page(
217+
owner_id=USER_MANAGER.get_request_user_id(request),
218+
include_tags=q.include_tags,
219+
exclude_tags=q.exclude_tags,
220+
name_contains=q.name_contains,
221+
metadata_filter=q.metadata_filter,
222+
limit=q.limit,
223+
offset=q.offset,
224+
sort=sort,
225+
order=order,
226+
after=q.after,
227+
)
228+
except InvalidCursorError as e:
229+
return _build_error_response(400, "INVALID_CURSOR", str(e))
225230

226231
summaries = [_build_asset_response(item) for item in result.items]
227232

233+
# has_more semantics differ by mode:
234+
# - cursor mode: a non-empty next_cursor means there are more results.
235+
# - offset mode: derived from total - (offset + page size).
236+
if q.after is not None:
237+
has_more = result.next_cursor is not None
238+
else:
239+
has_more = (q.offset + len(summaries)) < result.total
240+
228241
payload = schemas_out.AssetsList(
229242
assets=summaries,
230243
total=result.total,
231-
has_more=(q.offset + len(summaries)) < result.total,
244+
has_more=has_more,
245+
next_cursor=result.next_cursor,
232246
)
233247
return web.json_response(payload.model_dump(mode="json", exclude_none=True))
234248

app/assets/api/schemas_in.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ class ListAssetsQuery(BaseModel):
5959

6060
limit: conint(ge=1, le=500) = 20
6161
offset: conint(ge=0) = 0
62+
# Opaque keyset cursor. When supplied, `offset` is ignored. Cursor pagination
63+
# is supported for sort values `created_at`, `updated_at`, `name`, `size`.
64+
# Supplying `after` together with `sort=last_access_time` returns
65+
# 400 INVALID_CURSOR; that sort only supports offset/limit.
66+
after: str | None = None
6267

6368
sort: Literal["name", "created_at", "updated_at", "size", "last_access_time"] = (
6469
"created_at"

app/assets/api/schemas_out.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class AssetsList(BaseModel):
4141
assets: list[Asset]
4242
total: int
4343
has_more: bool
44+
# Opaque cursor for the next page. Omitted when there are no more results.
45+
next_cursor: str | None = None
4446

4547

4648
class TagUsage(BaseModel):

app/assets/database/queries/asset_reference.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,18 @@ def list_references_page(
266266
metadata_filter: dict | None = None,
267267
sort: str | None = None,
268268
order: str | None = None,
269+
after_cursor_value: object | None = None,
270+
after_cursor_id: str | None = None,
269271
) -> tuple[list[AssetReference], dict[str, list[str]], int]:
270272
"""List references with pagination, filtering, and sorting.
271273
274+
When ``after_cursor_value``/``after_cursor_id`` are supplied the query uses
275+
keyset pagination — ``offset`` is ignored and a WHERE clause selects rows
276+
strictly after the given ``(sort_col, id)`` position in the active sort
277+
direction. The cursor value must already be typed for the column
278+
(datetime for time sorts, int for size, str for name); the caller decodes
279+
the opaque cursor string and resolves to the typed value.
280+
272281
Returns (references, tag_map, total_count).
273282
"""
274283
base = (
@@ -297,9 +306,31 @@ def list_references_page(
297306
"size": Asset.size_bytes,
298307
}
299308
sort_col = sort_map.get(sort, AssetReference.created_at)
300-
sort_exp = sort_col.desc() if order == "desc" else sort_col.asc()
309+
descending = order == "desc"
310+
311+
# Keyset WHERE: (sort_col, id) strictly less-than / greater-than the cursor.
312+
# Equivalent to: sort_col <op> v OR (sort_col = v AND id <op> cursor_id).
313+
if after_cursor_value is not None and after_cursor_id is not None:
314+
if descending:
315+
keyset = sa.or_(
316+
sort_col < after_cursor_value,
317+
sa.and_(sort_col == after_cursor_value, AssetReference.id < after_cursor_id),
318+
)
319+
else:
320+
keyset = sa.or_(
321+
sort_col > after_cursor_value,
322+
sa.and_(sort_col == after_cursor_value, AssetReference.id > after_cursor_id),
323+
)
324+
base = base.where(keyset)
325+
326+
# Secondary ORDER BY id (matching the primary direction) gives the keyset
327+
# comparison a deterministic tiebreaker on duplicate sort_col values.
328+
id_exp = AssetReference.id.desc() if descending else AssetReference.id.asc()
329+
sort_exp = sort_col.desc() if descending else sort_col.asc()
301330

302-
base = base.order_by(sort_exp).limit(limit).offset(offset)
331+
base = base.order_by(sort_exp, id_exp).limit(limit)
332+
if after_cursor_id is None:
333+
base = base.offset(offset)
303334

304335
count_stmt = (
305336
select(sa.func.count())

app/assets/services/asset_management.py

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
11
import contextlib
22
import mimetypes
33
import os
4+
from datetime import timezone
45
from typing import Sequence
56

7+
from app.assets.services.cursor import (
8+
CursorPayload,
9+
InvalidCursorError,
10+
decode_cursor,
11+
decode_cursor_int,
12+
decode_cursor_time,
13+
encode_cursor,
14+
encode_cursor_from_time,
15+
)
16+
617

718
from app.assets.database.models import Asset
819
from app.assets.database.queries import (
@@ -242,6 +253,11 @@ def get_asset_by_hash(asset_hash: str) -> AssetData | None:
242253
return extract_asset_data(asset)
243254

244255

256+
# Sort fields that support cursor pagination. `last_access_time` is not
257+
# in this list — it falls back to offset/limit.
258+
_CURSOR_SORT_FIELDS = ("created_at", "updated_at", "name", "size")
259+
260+
245261
def list_assets_page(
246262
owner_id: str = "",
247263
include_tags: Sequence[str] | None = None,
@@ -252,7 +268,39 @@ def list_assets_page(
252268
offset: int = 0,
253269
sort: str = "created_at",
254270
order: str = "desc",
271+
after: str | None = None,
255272
) -> ListAssetsResult:
273+
"""List assets with optional cursor pagination.
274+
275+
When ``after`` is supplied it overrides ``offset``. The cursor's sort field
276+
must match ``sort`` and be in the cursor-supported allowlist; mismatches
277+
raise InvalidCursorError so the handler can map to 400 INVALID_CURSOR.
278+
"""
279+
cursor_value: object | None = None
280+
cursor_id: str | None = None
281+
# Mint next_cursor on every page where the sort is cursor-supported, not
282+
# only when the request itself arrived with a cursor. Otherwise a first
283+
# request (no `after`) returns next_cursor=None and the client can never
284+
# enter cursor mode.
285+
mint_cursor = sort in _CURSOR_SORT_FIELDS
286+
287+
if after is not None:
288+
if sort not in _CURSOR_SORT_FIELDS:
289+
raise InvalidCursorError(
290+
f"cursor pagination is not supported for sort={sort!r}"
291+
)
292+
payload = decode_cursor(after, _CURSOR_SORT_FIELDS, expected_order=order)
293+
if payload.sort_field != sort:
294+
raise InvalidCursorError(
295+
f"cursor sort field {payload.sort_field!r} does not match request sort {sort!r}"
296+
)
297+
cursor_value, cursor_id = _resolve_cursor_value(payload), payload.id
298+
299+
# Over-fetch by one row so we can distinguish "exactly `limit` rows total
300+
# remaining" from "more rows past this page" without a second query. Drop
301+
# the sentinel before returning.
302+
fetch_limit = limit + 1 if mint_cursor else limit
303+
256304
with create_session() as session:
257305
refs, tag_map, total = list_references_page(
258306
session,
@@ -261,12 +309,22 @@ def list_assets_page(
261309
exclude_tags=exclude_tags,
262310
name_contains=name_contains,
263311
metadata_filter=metadata_filter,
264-
limit=limit,
312+
limit=fetch_limit,
265313
offset=offset,
266314
sort=sort,
267315
order=order,
316+
after_cursor_value=cursor_value,
317+
after_cursor_id=cursor_id,
268318
)
269319

320+
next_cursor: str | None = None
321+
if mint_cursor and len(refs) > limit:
322+
# There's at least one more row past this page — mint a cursor from
323+
# the last row of the page (i.e. index `limit - 1`, since we
324+
# over-fetched), and drop the sentinel.
325+
next_cursor = _encode_next_cursor(refs[limit - 1], sort, order)
326+
refs = refs[:limit]
327+
270328
items: list[AssetSummaryData] = []
271329
for ref in refs:
272330
items.append(
@@ -277,7 +335,39 @@ def list_assets_page(
277335
)
278336
)
279337

280-
return ListAssetsResult(items=items, total=total)
338+
return ListAssetsResult(items=items, total=total, next_cursor=next_cursor)
339+
340+
341+
def _resolve_cursor_value(payload: CursorPayload) -> object:
342+
"""Map a decoded cursor payload to a column-typed Python value."""
343+
if payload.sort_field in ("created_at", "updated_at"):
344+
# DB stores naive UTC; strip tzinfo so the comparison binds against a
345+
# `TIMESTAMP WITHOUT TIME ZONE` column without an offset shift.
346+
return decode_cursor_time(payload).replace(tzinfo=None)
347+
if payload.sort_field == "size":
348+
return decode_cursor_int(payload)
349+
return payload.value # name, str-typed
350+
351+
352+
def _encode_next_cursor(ref, sort: str, order: str) -> str | None:
353+
"""Mint a cursor pointing at *ref* for the given sort dimension.
354+
355+
Returns None when the boundary row carries a NULL sort value (e.g. an asset
356+
record whose size_bytes hasn't been backfilled). Continuing pagination
357+
across a NULL boundary is undefined under keyset ordering — better to
358+
truncate cleanly here than to mint a cursor that mis-positions.
359+
"""
360+
if sort == "name":
361+
return encode_cursor("name", ref.name, ref.id, order=order)
362+
if sort == "size":
363+
if ref.asset is None or ref.asset.size_bytes is None:
364+
return None
365+
return encode_cursor("size", str(ref.asset.size_bytes), ref.id, order=order)
366+
# created_at / updated_at — DB datetimes are naive UTC; attach tz before encoding.
367+
value = ref.created_at if sort == "created_at" else ref.updated_at
368+
if value is None:
369+
return None
370+
return encode_cursor_from_time(sort, value.replace(tzinfo=timezone.utc), ref.id, order=order)
281371

282372

283373
def resolve_hash_to_path(

0 commit comments

Comments
 (0)