Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,26 @@ export class PostgresEntityDatabaseAdapter<
);
}

protected override async fetchManyWhereWithPaginationInternalAsync(
queryInterface: Knex,
tableName: string,
tableColumns: readonly string[],
tableTuple: readonly any[],
offset: number,
limit: number,
): Promise<object[]> {
return await this.fetchManyByFieldEqualityConjunctionInternalAsync(
queryInterface,
tableName,
tableColumns.map((column, index) => ({
tableField: column,
tableValue: tableTuple[index],
})),
[],
{ limit, offset, orderBy: undefined },
);
}

protected async fetchOneWhereInternalAsync(
queryInterface: Knex,
tableName: string,
Expand Down
34 changes: 34 additions & 0 deletions packages/entity/src/AuthorizationResultBasedEntityLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,40 @@ export class AuthorizationResultBasedEntityLoader<
return entityResultsForFieldValue;
}

/**
* Load entities in pages where fieldName equals fieldValue. Each yielded page contains
* at most `pageSize` authorized entity results. Bypasses DataLoader and cache to avoid
* loading all matching entities into memory at once.
*
* @param fieldName - entity field being queried
* @param fieldValue - fieldName field value being queried
* @param pageSize - maximum number of entities per page
* @param advanceOffset - if true, advances offset between pages (standard pagination).
* If false, always queries from offset 0 (use when each page's results are deleted
* or modified before the next page is fetched).
*/
async *loadManyByFieldEqualingInPagesAsync<N extends keyof Pick<TFields, TSelectedFields>>(
fieldName: N,
fieldValue: NonNullable<TFields[N]>,
pageSize: number,
advanceOffset: boolean,
): AsyncGenerator<readonly Result<TEntity>[]> {
const { loadKey, loadValue } = this.validateFieldAndValueAndConvertToHolders(
fieldName,
fieldValue,
);

for await (const fieldObjectsPage of this.dataManager.loadManyEqualingPaginatedAsync(
this.queryContext,
loadKey,
loadValue,
pageSize,
advanceOffset,
)) {
yield await this.constructionUtils.constructAndAuthorizeEntitiesArrayAsync(fieldObjectsPage);
}
}

/**
* Load one entity where fieldName equals fieldValue, or null if no entity exists matching the condition.
* Not cached or coalesced, and not guaranteed to be deterministic if multiple entities match the condition.
Expand Down
177 changes: 112 additions & 65 deletions packages/entity/src/AuthorizationResultBasedEntityMutator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -979,76 +979,123 @@
return;
}

const inboundReferenceEntities = await enforceResultsAsync(
loaderFactory
.forLoad(queryContext, {
previousValue: null,
cascadingDeleteCause: newCascadingDeleteCause,
})
.loadManyByFieldEqualingAsync(
fieldName,
association.associatedEntityLookupByField
? entity.getField(association.associatedEntityLookupByField)
: entity.getID(),
),
);

switch (association.edgeDeletionBehavior) {
case EntityEdgeDeletionBehavior.CASCADE_DELETE_INVALIDATE_CACHE_ONLY: {
await Promise.all(
inboundReferenceEntities.map((inboundReferenceEntity) =>
enforceAsyncResult(
mutatorFactory
.forDelete(inboundReferenceEntity, queryContext, newCascadingDeleteCause)
.deleteInTransactionAsync(
processedEntityIdentifiers,
/* skipDatabaseDeletion */ true, // deletion is handled by DB
),
const cascadeChunkSize = entityConfiguration.cascadeChunkSize;
const fieldValue = association.associatedEntityLookupByField
? entity.getField(association.associatedEntityLookupByField)
: entity.getID();

const loader = loaderFactory.forLoad(queryContext, {
previousValue: null,
cascadingDeleteCause: newCascadingDeleteCause,
});

const processPage = async (
inboundReferenceEntities: readonly TEntity[],
): Promise<void> => {
switch (association.edgeDeletionBehavior) {
case EntityEdgeDeletionBehavior.CASCADE_DELETE_INVALIDATE_CACHE_ONLY: {
await Promise.all(
inboundReferenceEntities.map((inboundReferenceEntity) =>
enforceAsyncResult(
mutatorFactory
.forDelete(

Check warning on line 1001 in packages/entity/src/AuthorizationResultBasedEntityMutator.ts

View workflow job for this annotation

GitHub Actions / build

Replace `⏎····························inboundReferenceEntity,⏎····························queryContext,⏎····························newCascadingDeleteCause,⏎··························` with `inboundReferenceEntity,·queryContext,·newCascadingDeleteCause`
inboundReferenceEntity,
queryContext,
newCascadingDeleteCause,
)
.deleteInTransactionAsync(
processedEntityIdentifiers,
/* skipDatabaseDeletion */ true, // deletion is handled by DB
),
),
),
),
);
break;
}
case EntityEdgeDeletionBehavior.SET_NULL_INVALIDATE_CACHE_ONLY: {
await Promise.all(
inboundReferenceEntities.map((inboundReferenceEntity) =>
enforceAsyncResult(
mutatorFactory
.forUpdate(inboundReferenceEntity, queryContext, newCascadingDeleteCause)
.setField(fieldName, null)
['updateInTransactionAsync'](/* skipDatabaseUpdate */ true),
);
break;
}
case EntityEdgeDeletionBehavior.SET_NULL_INVALIDATE_CACHE_ONLY: {
await Promise.all(
inboundReferenceEntities.map((inboundReferenceEntity) =>
enforceAsyncResult(
mutatorFactory
.forUpdate(

Check warning on line 1020 in packages/entity/src/AuthorizationResultBasedEntityMutator.ts

View workflow job for this annotation

GitHub Actions / build

Replace `⏎····························inboundReferenceEntity,⏎····························queryContext,⏎····························newCascadingDeleteCause,⏎··························` with `inboundReferenceEntity,·queryContext,·newCascadingDeleteCause`
inboundReferenceEntity,
queryContext,
newCascadingDeleteCause,
)
.setField(fieldName, null)
['updateInTransactionAsync'](/* skipDatabaseUpdate */ true),
),
),
),
);
break;
}
case EntityEdgeDeletionBehavior.SET_NULL: {
await Promise.all(
inboundReferenceEntities.map((inboundReferenceEntity) =>
enforceAsyncResult(
mutatorFactory
.forUpdate(inboundReferenceEntity, queryContext, newCascadingDeleteCause)
.setField(fieldName, null)
['updateInTransactionAsync'](/* skipDatabaseUpdate */ false),
);
break;
}
case EntityEdgeDeletionBehavior.SET_NULL: {
await Promise.all(
inboundReferenceEntities.map((inboundReferenceEntity) =>
enforceAsyncResult(
mutatorFactory
.forUpdate(

Check warning on line 1037 in packages/entity/src/AuthorizationResultBasedEntityMutator.ts

View workflow job for this annotation

GitHub Actions / build

Replace `⏎····························inboundReferenceEntity,⏎····························queryContext,⏎····························newCascadingDeleteCause,⏎··························` with `inboundReferenceEntity,·queryContext,·newCascadingDeleteCause`
inboundReferenceEntity,
queryContext,
newCascadingDeleteCause,
)
.setField(fieldName, null)
['updateInTransactionAsync'](/* skipDatabaseUpdate */ false),
),
),
),
);
break;
}
case EntityEdgeDeletionBehavior.CASCADE_DELETE: {
await Promise.all(
inboundReferenceEntities.map((inboundReferenceEntity) =>
enforceAsyncResult(
mutatorFactory
.forDelete(inboundReferenceEntity, queryContext, newCascadingDeleteCause)
.deleteInTransactionAsync(
processedEntityIdentifiers,
/* skipDatabaseDeletion */ false,
),
);
break;
}
case EntityEdgeDeletionBehavior.CASCADE_DELETE: {
await Promise.all(
inboundReferenceEntities.map((inboundReferenceEntity) =>
enforceAsyncResult(
mutatorFactory
.forDelete(

Check warning on line 1054 in packages/entity/src/AuthorizationResultBasedEntityMutator.ts

View workflow job for this annotation

GitHub Actions / build

Replace `⏎····························inboundReferenceEntity,⏎····························queryContext,⏎····························newCascadingDeleteCause,⏎··························` with `inboundReferenceEntity,·queryContext,·newCascadingDeleteCause`
inboundReferenceEntity,
queryContext,
newCascadingDeleteCause,
)
.deleteInTransactionAsync(
processedEntityIdentifiers,
/* skipDatabaseDeletion */ false,
),
),
),
),
);
);
}
}
};

if (isFinite(cascadeChunkSize)) {
// For CASCADE_DELETE and SET_NULL, processing each page deletes or modifies
// the matched rows in the database, so subsequent rows shift down. We must
// re-query from offset 0 each time.
// For INVALIDATE_CACHE_ONLY variants, the framework skips the DB write
// (the DB's own FK cascade hasn't fired yet since the parent row hasn't
// been deleted), so rows remain and we advance the offset normally.
const advanceOffset =
association.edgeDeletionBehavior ===
EntityEdgeDeletionBehavior.CASCADE_DELETE_INVALIDATE_CACHE_ONLY ||
association.edgeDeletionBehavior ===
EntityEdgeDeletionBehavior.SET_NULL_INVALIDATE_CACHE_ONLY;

// Paginated: load and process one page at a time to bound memory
for await (const pageResults of loader.loadManyByFieldEqualingInPagesAsync(
fieldName,
fieldValue,
cascadeChunkSize,
advanceOffset,
)) {
const page = pageResults.map((r) => r.enforceValue());
await processPage(page);
}
} else {
// Unpaginated: load all at once (original behavior)
const inboundReferenceEntities = await enforceResultsAsync(
loader.loadManyByFieldEqualingAsync(fieldName, fieldValue),
);
await processPage(inboundReferenceEntities);
}
},
);
Expand Down
29 changes: 29 additions & 0 deletions packages/entity/src/EnforcingEntityLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,35 @@ export class EnforcingEntityLoader<
return entityResults.map((result) => result.enforceValue());
}

/**
* Load entities in pages where fieldName equals fieldValue. Each yielded page contains
* at most `pageSize` enforced entities. Bypasses DataLoader and cache to avoid
* loading all matching entities into memory at once.
*
* @param fieldName - entity field being queried
* @param fieldValue - fieldName field value being queried
* @param pageSize - maximum number of entities per page
* @param advanceOffset - if true, advances offset between pages (standard pagination).
* If false, always queries from offset 0 (use when each page's results are deleted
* or modified before the next page is fetched).
* @throws EntityNotAuthorizedError when viewer is not authorized to view one or more of the returned entities
*/
async *loadManyByFieldEqualingInPagesAsync<N extends keyof Pick<TFields, TSelectedFields>>(
fieldName: N,
fieldValue: NonNullable<TFields[N]>,
pageSize: number,
advanceOffset: boolean,
): AsyncGenerator<readonly TEntity[]> {
for await (const pageResults of this.entityLoader.loadManyByFieldEqualingInPagesAsync(
fieldName,
fieldValue,
pageSize,
advanceOffset,
)) {
yield pageResults.map((result) => result.enforceValue());
}
}

/**
* Load many entities where compositeField equals compositeFieldValue.
* @param compositeField - composite field being queried
Expand Down
16 changes: 16 additions & 0 deletions packages/entity/src/EntityConfiguration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ export class EntityConfiguration<
readonly databaseAdapterFlavor: DatabaseAdapterFlavor;
readonly cacheAdapterFlavor: CacheAdapterFlavor;

/**
* Maximum number of inbound reference entities to process concurrently during
* cascade deletion. Controls peak memory usage for entities with many inbound references.
* Defaults to Infinity (process all at once, preserving existing behavior).
*/
readonly cascadeChunkSize: number;

constructor({
idField,
tableName,
Expand All @@ -131,6 +138,7 @@ export class EntityConfiguration<
compositeFieldDefinitions,
databaseAdapterFlavor,
cacheAdapterFlavor,
cascadeChunkSize = Infinity,
}: {
/**
* The field used to identify this entity. Must be a unique field in the table.
Expand Down Expand Up @@ -173,12 +181,20 @@ export class EntityConfiguration<
* Cache system for this entity.
*/
cacheAdapterFlavor: CacheAdapterFlavor;

/**
* Maximum number of inbound reference entities to process concurrently during
* cascade deletion. Controls peak memory usage for entities with many inbound references.
* Defaults to Infinity (process all at once, preserving existing behavior).
*/
cascadeChunkSize?: number;
}) {
this.idField = idField;
this.tableName = tableName;
this.cacheKeyVersion = cacheKeyVersion;
this.databaseAdapterFlavor = databaseAdapterFlavor;
this.cacheAdapterFlavor = cacheAdapterFlavor;
this.cascadeChunkSize = cascadeChunkSize;
this.inboundEdges = inboundEdges;

// external schema is a Record to typecheck that all fields have FieldDefinitions,
Expand Down
Loading
Loading