From e0fdbf6d1b5c3e15d94ddbd84f4a643fd4ed55ef Mon Sep 17 00:00:00 2001 From: Will Schurman Date: Mon, 30 Mar 2026 16:30:42 -0700 Subject: [PATCH] feat: batch deletion --- .../src/PostgresEntityDatabaseAdapter.ts | 20 ++ .../AuthorizationResultBasedEntityLoader.ts | 34 +++ .../AuthorizationResultBasedEntityMutator.ts | 177 +++++++----- packages/entity/src/EnforcingEntityLoader.ts | 29 ++ packages/entity/src/EntityConfiguration.ts | 16 ++ packages/entity/src/EntityDatabaseAdapter.ts | 60 ++++ packages/entity/src/index.ts | 1 + .../entity/src/internal/EntityDataManager.ts | 57 ++++ .../src/utils/EntityCascadeDeletionUtils.ts | 260 ++++++++++++++++++ .../entity/src/utils/collections/arrays.ts | 21 ++ 10 files changed, 610 insertions(+), 65 deletions(-) create mode 100644 packages/entity/src/utils/EntityCascadeDeletionUtils.ts create mode 100644 packages/entity/src/utils/collections/arrays.ts diff --git a/packages/entity-database-adapter-knex/src/PostgresEntityDatabaseAdapter.ts b/packages/entity-database-adapter-knex/src/PostgresEntityDatabaseAdapter.ts index d03e37eec..17474ffb3 100644 --- a/packages/entity-database-adapter-knex/src/PostgresEntityDatabaseAdapter.ts +++ b/packages/entity-database-adapter-knex/src/PostgresEntityDatabaseAdapter.ts @@ -102,6 +102,26 @@ export class PostgresEntityDatabaseAdapter< ); } + protected override async fetchManyWhereWithPaginationInternalAsync( + queryInterface: Knex, + tableName: string, + tableColumns: readonly string[], + tableTuple: readonly any[], + offset: number, + limit: number, + ): Promise { + return await this.fetchManyByFieldEqualityConjunctionInternalAsync( + queryInterface, + tableName, + tableColumns.map((column, index) => ({ + tableField: column, + tableValue: tableTuple[index], + })), + [], + { limit, offset, orderBy: undefined }, + ); + } + protected async fetchOneWhereInternalAsync( queryInterface: Knex, tableName: string, diff --git a/packages/entity/src/AuthorizationResultBasedEntityLoader.ts b/packages/entity/src/AuthorizationResultBasedEntityLoader.ts index 2a8d4201f..556b178cd 100644 --- a/packages/entity/src/AuthorizationResultBasedEntityLoader.ts +++ b/packages/entity/src/AuthorizationResultBasedEntityLoader.ts @@ -136,6 +136,40 @@ export class AuthorizationResultBasedEntityLoader< return entityResultsForFieldValue; } + /** + * Load entities in pages where fieldName equals fieldValue. Each yielded page contains + * at most `pageSize` authorized entity results. Bypasses DataLoader and cache to avoid + * loading all matching entities into memory at once. + * + * @param fieldName - entity field being queried + * @param fieldValue - fieldName field value being queried + * @param pageSize - maximum number of entities per page + * @param advanceOffset - if true, advances offset between pages (standard pagination). + * If false, always queries from offset 0 (use when each page's results are deleted + * or modified before the next page is fetched). + */ + async *loadManyByFieldEqualingInPagesAsync>( + fieldName: N, + fieldValue: NonNullable, + pageSize: number, + advanceOffset: boolean, + ): AsyncGenerator[]> { + const { loadKey, loadValue } = this.validateFieldAndValueAndConvertToHolders( + fieldName, + fieldValue, + ); + + for await (const fieldObjectsPage of this.dataManager.loadManyEqualingPaginatedAsync( + this.queryContext, + loadKey, + loadValue, + pageSize, + advanceOffset, + )) { + yield await this.constructionUtils.constructAndAuthorizeEntitiesArrayAsync(fieldObjectsPage); + } + } + /** * Load one entity where fieldName equals fieldValue, or null if no entity exists matching the condition. * Not cached or coalesced, and not guaranteed to be deterministic if multiple entities match the condition. diff --git a/packages/entity/src/AuthorizationResultBasedEntityMutator.ts b/packages/entity/src/AuthorizationResultBasedEntityMutator.ts index 52690c7b5..98b12479d 100644 --- a/packages/entity/src/AuthorizationResultBasedEntityMutator.ts +++ b/packages/entity/src/AuthorizationResultBasedEntityMutator.ts @@ -979,76 +979,123 @@ export class AuthorizationResultBasedDeleteMutator< return; } - const inboundReferenceEntities = await enforceResultsAsync( - loaderFactory - .forLoad(queryContext, { - previousValue: null, - cascadingDeleteCause: newCascadingDeleteCause, - }) - .loadManyByFieldEqualingAsync( - fieldName, - association.associatedEntityLookupByField - ? entity.getField(association.associatedEntityLookupByField) - : entity.getID(), - ), - ); - - switch (association.edgeDeletionBehavior) { - case EntityEdgeDeletionBehavior.CASCADE_DELETE_INVALIDATE_CACHE_ONLY: { - await Promise.all( - inboundReferenceEntities.map((inboundReferenceEntity) => - enforceAsyncResult( - mutatorFactory - .forDelete(inboundReferenceEntity, queryContext, newCascadingDeleteCause) - .deleteInTransactionAsync( - processedEntityIdentifiers, - /* skipDatabaseDeletion */ true, // deletion is handled by DB - ), + const cascadeChunkSize = entityConfiguration.cascadeChunkSize; + const fieldValue = association.associatedEntityLookupByField + ? entity.getField(association.associatedEntityLookupByField) + : entity.getID(); + + const loader = loaderFactory.forLoad(queryContext, { + previousValue: null, + cascadingDeleteCause: newCascadingDeleteCause, + }); + + const processPage = async ( + inboundReferenceEntities: readonly TEntity[], + ): Promise => { + switch (association.edgeDeletionBehavior) { + case EntityEdgeDeletionBehavior.CASCADE_DELETE_INVALIDATE_CACHE_ONLY: { + await Promise.all( + inboundReferenceEntities.map((inboundReferenceEntity) => + enforceAsyncResult( + mutatorFactory + .forDelete( + inboundReferenceEntity, + queryContext, + newCascadingDeleteCause, + ) + .deleteInTransactionAsync( + processedEntityIdentifiers, + /* skipDatabaseDeletion */ true, // deletion is handled by DB + ), + ), ), - ), - ); - break; - } - case EntityEdgeDeletionBehavior.SET_NULL_INVALIDATE_CACHE_ONLY: { - await Promise.all( - inboundReferenceEntities.map((inboundReferenceEntity) => - enforceAsyncResult( - mutatorFactory - .forUpdate(inboundReferenceEntity, queryContext, newCascadingDeleteCause) - .setField(fieldName, null) - ['updateInTransactionAsync'](/* skipDatabaseUpdate */ true), + ); + break; + } + case EntityEdgeDeletionBehavior.SET_NULL_INVALIDATE_CACHE_ONLY: { + await Promise.all( + inboundReferenceEntities.map((inboundReferenceEntity) => + enforceAsyncResult( + mutatorFactory + .forUpdate( + inboundReferenceEntity, + queryContext, + newCascadingDeleteCause, + ) + .setField(fieldName, null) + ['updateInTransactionAsync'](/* skipDatabaseUpdate */ true), + ), ), - ), - ); - break; - } - case EntityEdgeDeletionBehavior.SET_NULL: { - await Promise.all( - inboundReferenceEntities.map((inboundReferenceEntity) => - enforceAsyncResult( - mutatorFactory - .forUpdate(inboundReferenceEntity, queryContext, newCascadingDeleteCause) - .setField(fieldName, null) - ['updateInTransactionAsync'](/* skipDatabaseUpdate */ false), + ); + break; + } + case EntityEdgeDeletionBehavior.SET_NULL: { + await Promise.all( + inboundReferenceEntities.map((inboundReferenceEntity) => + enforceAsyncResult( + mutatorFactory + .forUpdate( + inboundReferenceEntity, + queryContext, + newCascadingDeleteCause, + ) + .setField(fieldName, null) + ['updateInTransactionAsync'](/* skipDatabaseUpdate */ false), + ), ), - ), - ); - break; - } - case EntityEdgeDeletionBehavior.CASCADE_DELETE: { - await Promise.all( - inboundReferenceEntities.map((inboundReferenceEntity) => - enforceAsyncResult( - mutatorFactory - .forDelete(inboundReferenceEntity, queryContext, newCascadingDeleteCause) - .deleteInTransactionAsync( - processedEntityIdentifiers, - /* skipDatabaseDeletion */ false, - ), + ); + break; + } + case EntityEdgeDeletionBehavior.CASCADE_DELETE: { + await Promise.all( + inboundReferenceEntities.map((inboundReferenceEntity) => + enforceAsyncResult( + mutatorFactory + .forDelete( + inboundReferenceEntity, + queryContext, + newCascadingDeleteCause, + ) + .deleteInTransactionAsync( + processedEntityIdentifiers, + /* skipDatabaseDeletion */ false, + ), + ), ), - ), - ); + ); + } + } + }; + + if (isFinite(cascadeChunkSize)) { + // For CASCADE_DELETE and SET_NULL, processing each page deletes or modifies + // the matched rows in the database, so subsequent rows shift down. We must + // re-query from offset 0 each time. + // For INVALIDATE_CACHE_ONLY variants, the framework skips the DB write + // (the DB's own FK cascade hasn't fired yet since the parent row hasn't + // been deleted), so rows remain and we advance the offset normally. + const advanceOffset = + association.edgeDeletionBehavior === + EntityEdgeDeletionBehavior.CASCADE_DELETE_INVALIDATE_CACHE_ONLY || + association.edgeDeletionBehavior === + EntityEdgeDeletionBehavior.SET_NULL_INVALIDATE_CACHE_ONLY; + + // Paginated: load and process one page at a time to bound memory + for await (const pageResults of loader.loadManyByFieldEqualingInPagesAsync( + fieldName, + fieldValue, + cascadeChunkSize, + advanceOffset, + )) { + const page = pageResults.map((r) => r.enforceValue()); + await processPage(page); } + } else { + // Unpaginated: load all at once (original behavior) + const inboundReferenceEntities = await enforceResultsAsync( + loader.loadManyByFieldEqualingAsync(fieldName, fieldValue), + ); + await processPage(inboundReferenceEntities); } }, ); diff --git a/packages/entity/src/EnforcingEntityLoader.ts b/packages/entity/src/EnforcingEntityLoader.ts index 39043b768..ed6f7a20f 100644 --- a/packages/entity/src/EnforcingEntityLoader.ts +++ b/packages/entity/src/EnforcingEntityLoader.ts @@ -104,6 +104,35 @@ export class EnforcingEntityLoader< return entityResults.map((result) => result.enforceValue()); } + /** + * Load entities in pages where fieldName equals fieldValue. Each yielded page contains + * at most `pageSize` enforced entities. Bypasses DataLoader and cache to avoid + * loading all matching entities into memory at once. + * + * @param fieldName - entity field being queried + * @param fieldValue - fieldName field value being queried + * @param pageSize - maximum number of entities per page + * @param advanceOffset - if true, advances offset between pages (standard pagination). + * If false, always queries from offset 0 (use when each page's results are deleted + * or modified before the next page is fetched). + * @throws EntityNotAuthorizedError when viewer is not authorized to view one or more of the returned entities + */ + async *loadManyByFieldEqualingInPagesAsync>( + fieldName: N, + fieldValue: NonNullable, + pageSize: number, + advanceOffset: boolean, + ): AsyncGenerator { + for await (const pageResults of this.entityLoader.loadManyByFieldEqualingInPagesAsync( + fieldName, + fieldValue, + pageSize, + advanceOffset, + )) { + yield pageResults.map((result) => result.enforceValue()); + } + } + /** * Load many entities where compositeField equals compositeFieldValue. * @param compositeField - composite field being queried diff --git a/packages/entity/src/EntityConfiguration.ts b/packages/entity/src/EntityConfiguration.ts index 1db343cc9..4521e0653 100644 --- a/packages/entity/src/EntityConfiguration.ts +++ b/packages/entity/src/EntityConfiguration.ts @@ -122,6 +122,13 @@ export class EntityConfiguration< readonly databaseAdapterFlavor: DatabaseAdapterFlavor; readonly cacheAdapterFlavor: CacheAdapterFlavor; + /** + * Maximum number of inbound reference entities to process concurrently during + * cascade deletion. Controls peak memory usage for entities with many inbound references. + * Defaults to Infinity (process all at once, preserving existing behavior). + */ + readonly cascadeChunkSize: number; + constructor({ idField, tableName, @@ -131,6 +138,7 @@ export class EntityConfiguration< compositeFieldDefinitions, databaseAdapterFlavor, cacheAdapterFlavor, + cascadeChunkSize = Infinity, }: { /** * The field used to identify this entity. Must be a unique field in the table. @@ -173,12 +181,20 @@ export class EntityConfiguration< * Cache system for this entity. */ cacheAdapterFlavor: CacheAdapterFlavor; + + /** + * Maximum number of inbound reference entities to process concurrently during + * cascade deletion. Controls peak memory usage for entities with many inbound references. + * Defaults to Infinity (process all at once, preserving existing behavior). + */ + cascadeChunkSize?: number; }) { this.idField = idField; this.tableName = tableName; this.cacheKeyVersion = cacheKeyVersion; this.databaseAdapterFlavor = databaseAdapterFlavor; this.cacheAdapterFlavor = cacheAdapterFlavor; + this.cascadeChunkSize = cascadeChunkSize; this.inboundEdges = inboundEdges; // external schema is a Record to typecheck that all fields have FieldDefinitions, diff --git a/packages/entity/src/EntityDatabaseAdapter.ts b/packages/entity/src/EntityDatabaseAdapter.ts index cc6250a37..7091546fa 100644 --- a/packages/entity/src/EntityDatabaseAdapter.ts +++ b/packages/entity/src/EntityDatabaseAdapter.ts @@ -100,6 +100,66 @@ export abstract class EntityDatabaseAdapter< tableTuples: (readonly any[])[], ): Promise; + /** + * Fetch objects where key equals a single value, with offset and limit for pagination. + * Bypasses DataLoader and cache since pagination doesn't compose with batching. + * + * @param queryContext - query context with which to perform the fetch + * @param key - load key being queried + * @param value - single load value being queried + * @param offset - number of rows to skip + * @param limit - maximum number of rows to return + * @returns array of field objects matching the query with pagination applied + */ + async fetchManyWhereWithPaginationAsync< + TLoadKey extends IEntityLoadKey, + TSerializedLoadValue, + TLoadValue extends IEntityLoadValue, + >( + queryContext: EntityQueryContext, + key: TLoadKey, + value: TLoadValue, + offset: number, + limit: number, + ): Promise[]> { + const keyDatabaseColumns = key.getDatabaseColumns(this.entityConfiguration); + const valueDatabaseValues = key.getDatabaseValues(value); + + const results = await this.fetchManyWhereWithPaginationInternalAsync( + queryContext.getQueryInterface(), + this.entityConfiguration.tableName, + keyDatabaseColumns, + valueDatabaseValues, + offset, + limit, + ); + + return results.map((result) => + transformDatabaseObjectToFields(this.entityConfiguration, this.fieldTransformerMap, result), + ); + } + + /** + * Internal pagination fetch. Default implementation delegates to fetchManyWhereInternalAsync + * and slices in memory. Subclasses should override for efficient database-level pagination. + */ + protected async fetchManyWhereWithPaginationInternalAsync( + queryInterface: any, + tableName: string, + tableColumns: readonly string[], + tableTuple: readonly any[], + offset: number, + limit: number, + ): Promise { + const allResults = await this.fetchManyWhereInternalAsync( + queryInterface, + tableName, + tableColumns, + [tableTuple], + ); + return allResults.slice(offset, offset + limit); + } + /** * Fetch one objects where key is equal to value, null if no matching object exists. * Returned object is not guaranteed to be deterministic. Most concrete implementations will implement this diff --git a/packages/entity/src/index.ts b/packages/entity/src/index.ts index acc1b7c88..bda0c1fd5 100644 --- a/packages/entity/src/index.ts +++ b/packages/entity/src/index.ts @@ -75,6 +75,7 @@ export * from './rules/AlwaysDenyPrivacyPolicyRule.ts'; export * from './rules/AlwaysSkipPrivacyPolicyRule.ts'; export * from './rules/EvaluateIfEntityFieldPredicatePrivacyPolicyRule.ts'; export * from './rules/PrivacyPolicyRule.ts'; +export * from './utils/EntityCascadeDeletionUtils.ts'; export * from './utils/EntityCreationUtils.ts'; export * from './utils/EntityPrivacyUtils.ts'; export * from './utils/mergeEntityMutationTriggerConfigurations.ts'; diff --git a/packages/entity/src/internal/EntityDataManager.ts b/packages/entity/src/internal/EntityDataManager.ts index 07640778c..ee2d90930 100644 --- a/packages/entity/src/internal/EntityDataManager.ts +++ b/packages/entity/src/internal/EntityDataManager.ts @@ -243,6 +243,63 @@ export class EntityDataManager< return mapToReturn; } + /** + * Load objects in pages via offset/limit pagination, bypassing DataLoader and cache. + * Yields one page of raw field objects at a time. + * + * @param queryContext - query context in which to perform the load + * @param key - load key being queried + * @param value - single load value being queried for the key + * @param pageSize - number of objects per page + * @param advanceOffset - if true, advances offset between pages (standard pagination). + * If false, always queries from offset 0 (use when processing each page deletes or + * modifies the matched rows, causing subsequent rows to shift). + */ + async *loadManyEqualingPaginatedAsync< + TLoadKey extends IEntityLoadKey, + TSerializedLoadValue, + TLoadValue extends IEntityLoadValue, + >( + queryContext: EntityQueryContext, + key: TLoadKey, + value: TLoadValue, + pageSize: number, + advanceOffset: boolean, + ): AsyncGenerator[]> { + let offset = 0; + while (true) { + this.metricsAdapter.incrementDataManagerLoadCount({ + type: IncrementLoadCountEventType.DATABASE, + isInTransaction: queryContext.isInTransaction(), + fieldValueCount: 1, + entityClassName: this.entityClassName, + loadType: key.getLoadMethodType(), + }); + + const page = await this.databaseAdapter.fetchManyWhereWithPaginationAsync( + queryContext, + key, + value, + offset, + pageSize, + ); + + if (page.length === 0) { + break; + } + + yield page; + + if (page.length < pageSize) { + break; + } + + if (advanceOffset) { + offset += pageSize; + } + } + } + /** * Load one object matching load key and load value if at least one matching object exists. * Returned object is not guaranteed to be deterministic. diff --git a/packages/entity/src/utils/EntityCascadeDeletionUtils.ts b/packages/entity/src/utils/EntityCascadeDeletionUtils.ts new file mode 100644 index 000000000..b48a1ae3f --- /dev/null +++ b/packages/entity/src/utils/EntityCascadeDeletionUtils.ts @@ -0,0 +1,260 @@ +import type { IEntityClass } from '../Entity.ts'; +import { EntityEdgeDeletionBehavior } from '../EntityFieldDefinition.ts'; +import type { EntityCascadingDeletionInfo } from '../EntityMutationInfo.ts'; +import type { ReadonlyEntity } from '../ReadonlyEntity.ts'; +import type { ViewerContext } from '../ViewerContext.ts'; + +/** + * Options for {@link EntityCascadeDeletionUtils.deleteInboundReferencesInBatchesAsync}. + */ +export interface DeleteInboundReferencesOptions { + /** + * Number of inbound reference entities to load and delete per batch. + * Each batch is processed in its own transaction. + * @default 100 + */ + batchSize?: number; + + /** + * Called when a single entity deletion fails. By default, errors are silently + * swallowed since this is a best-effort pre-deletion pass — the subsequent root + * entity deletion will cascade-delete any stragglers. + * + * Throw from this callback to abort the entire operation. + */ + onEntityDeletionError?: (error: unknown, entityId: unknown) => void; +} + +/** + * Utility for pre-emptively deleting inbound references to an entity in batches, + * outside of the root entity's transaction. This is useful when an entity has a + * very large number of inbound references that would cause memory or transaction + * duration issues if processed in a single cascade deletion. + * + * Usage pattern: + * ```typescript + * // 1. Best-effort batch delete of inbound references + * await EntityCascadeDeletionUtils.deleteInboundReferencesInBatchesAsync( + * viewerContext, + * rootEntity, + * { batchSize: 100 }, + * ); + * + * // 2. Normal delete of root entity (handles any stragglers via regular cascade) + * await enforceAsyncResult(rootEntity.deleteAsync()); + * ``` + */ +export class EntityCascadeDeletionUtils { + /** + * Delete all inbound references to `rootEntity` in batches. Each batch of + * inbound entities is loaded, then deleted individually (each delete in its + * own transaction, triggering any sub-cascades normally). This is best-effort: + * individual entity deletion failures are passed to `onEntityDeletionError` + * (or silently swallowed by default). + * + * After this completes, the caller should delete the root entity normally. + * Any references that were created concurrently or failed to delete will be + * handled by the root entity's normal cascade deletion. + */ + static async deleteInboundReferencesInBatchesAsync( + viewerContext: ViewerContext, + rootEntity: ReadonlyEntity, + options: DeleteInboundReferencesOptions = {}, + ): Promise { + const { batchSize = 100, onEntityDeletionError } = options; + + const companionProvider = viewerContext.entityCompanionProvider; + + const rootEntityClass = rootEntity.constructor as IEntityClass; + const entityConfiguration = + companionProvider.getCompanionForEntity(rootEntityClass).entityCompanionDefinition + .entityConfiguration; + const inboundEdges = entityConfiguration.inboundEdges; + + const cascadingDeleteCause: EntityCascadingDeletionInfo = { + entity: rootEntity, + cascadingDeleteCause: null, + }; + + for (const inboundEntityClass of inboundEdges) { + const inboundConfiguration = + companionProvider.getCompanionForEntity(inboundEntityClass).entityCompanionDefinition + .entityConfiguration; + + for (const [fieldName, fieldDefinition] of inboundConfiguration.schema) { + const association = fieldDefinition.association; + if (!association) { + continue; + } + + const associatedConfiguration = + companionProvider.getCompanionForEntity(association.associatedEntityClass) + .entityCompanionDefinition.entityConfiguration; + if (associatedConfiguration !== entityConfiguration) { + continue; + } + + const fieldValue = association.associatedEntityLookupByField + ? rootEntity.getField(association.associatedEntityLookupByField) + : rootEntity.getID(); + + switch (association.edgeDeletionBehavior) { + case EntityEdgeDeletionBehavior.CASCADE_DELETE: + case EntityEdgeDeletionBehavior.CASCADE_DELETE_INVALIDATE_CACHE_ONLY: { + await this.deleteBatchedAsync( + viewerContext, + inboundEntityClass, + fieldName as string, + fieldValue, + batchSize, + cascadingDeleteCause, + onEntityDeletionError, + ); + break; + } + case EntityEdgeDeletionBehavior.SET_NULL: + case EntityEdgeDeletionBehavior.SET_NULL_INVALIDATE_CACHE_ONLY: { + await this.setNullBatchedAsync( + viewerContext, + inboundEntityClass, + fieldName as string, + fieldValue, + batchSize, + cascadingDeleteCause, + onEntityDeletionError, + ); + break; + } + } + } + } + } + + private static async deleteBatchedAsync( + viewerContext: ViewerContext, + entityClass: IEntityClass, + fieldName: string, + fieldValue: unknown, + batchSize: number, + cascadingDeleteCause: EntityCascadingDeletionInfo, + onEntityDeletionError: DeleteInboundReferencesOptions['onEntityDeletionError'], + ): Promise { + const companion = viewerContext.getViewerScopedEntityCompanionForClass(entityClass); + + while (true) { + const queryContext = this.getQueryContextForEntityClass(viewerContext, entityClass); + // Load one page. Always offset 0 since we delete each page's rows before loading the next. + const pageIterator = companion + .getLoaderFactory() + .forLoad(queryContext, { + previousValue: null, + cascadingDeleteCause, + }) + .loadManyByFieldEqualingInPagesAsync(fieldName, fieldValue, batchSize, false); + + const iteratorResult = await pageIterator[Symbol.asyncIterator]().next(); + if (iteratorResult.done || !iteratorResult.value || iteratorResult.value.length === 0) { + break; + } + + const entityResults = iteratorResult.value; + + for (const entityResult of entityResults) { + if (!entityResult.ok) { + if (onEntityDeletionError) { + onEntityDeletionError(entityResult.reason, undefined); + } + continue; + } + const entity = entityResult.enforceValue(); + try { + const deleteQueryContext = this.getQueryContextForEntityClass(viewerContext, entityClass); + const deleteResult = await companion + .getMutatorFactory() + .forDelete(entity, deleteQueryContext, cascadingDeleteCause) + .deleteAsync(); + if (!deleteResult.ok) { + if (onEntityDeletionError) { + onEntityDeletionError(deleteResult.reason, entity.getID()); + } + } + } catch (error) { + if (onEntityDeletionError) { + onEntityDeletionError(error, entity.getID()); + } + } + } + } + } + + private static async setNullBatchedAsync( + viewerContext: ViewerContext, + entityClass: IEntityClass, + fieldName: string, + fieldValue: unknown, + batchSize: number, + cascadingDeleteCause: EntityCascadingDeletionInfo, + onEntityDeletionError: DeleteInboundReferencesOptions['onEntityDeletionError'], + ): Promise { + const companion = viewerContext.getViewerScopedEntityCompanionForClass(entityClass); + + while (true) { + const queryContext = this.getQueryContextForEntityClass(viewerContext, entityClass); + // Load one page. Always offset 0 since SET NULL modifies the matched column, + // causing rows to no longer match the query. + const pageIterator = companion + .getLoaderFactory() + .forLoad(queryContext, { + previousValue: null, + cascadingDeleteCause, + }) + .loadManyByFieldEqualingInPagesAsync(fieldName, fieldValue, batchSize, false); + + const iteratorResult = await pageIterator[Symbol.asyncIterator]().next(); + if (iteratorResult.done || !iteratorResult.value || iteratorResult.value.length === 0) { + break; + } + + const entityResults = iteratorResult.value; + + for (const entityResult of entityResults) { + if (!entityResult.ok) { + if (onEntityDeletionError) { + onEntityDeletionError(entityResult.reason, undefined); + } + continue; + } + const entity = entityResult.enforceValue(); + try { + const updateQueryContext = this.getQueryContextForEntityClass(viewerContext, entityClass); + const updateResult = await companion + .getMutatorFactory() + .forUpdate(entity, updateQueryContext, cascadingDeleteCause) + .setField(fieldName as any, null) + .updateAsync(); + if (!updateResult.ok) { + if (onEntityDeletionError) { + onEntityDeletionError(updateResult.reason, entity.getID()); + } + } + } catch (error) { + if (onEntityDeletionError) { + onEntityDeletionError(error, entity.getID()); + } + } + } + } + } + + private static getQueryContextForEntityClass( + viewerContext: ViewerContext, + entityClass: IEntityClass, + ): ReturnType { + const entityConfiguration = + viewerContext.entityCompanionProvider.getCompanionForEntity(entityClass) + .entityCompanionDefinition.entityConfiguration; + return viewerContext.getQueryContextForDatabaseAdapterFlavor( + entityConfiguration.databaseAdapterFlavor, + ); + } +} diff --git a/packages/entity/src/utils/collections/arrays.ts b/packages/entity/src/utils/collections/arrays.ts new file mode 100644 index 000000000..aada0885a --- /dev/null +++ b/packages/entity/src/utils/collections/arrays.ts @@ -0,0 +1,21 @@ +/** + * Process an array in chunks, awaiting each chunk before starting the next. + * Within each chunk, items are processed concurrently via Promise.all. + * + * This is useful for bounding peak memory and concurrency when processing + * large arrays of async work (e.g., cascade deletions). + * + * @param items - array of items to process + * @param chunkSize - maximum number of items to process concurrently + * @param processor - async function to apply to each item + */ +export async function processInChunksAsync( + items: readonly T[], + chunkSize: number, + processor: (item: T) => Promise, +): Promise { + for (let i = 0; i < items.length; i += chunkSize) { + const chunk = items.slice(i, i + chunkSize); + await Promise.all(chunk.map(processor)); + } +}