diff --git a/packages/zero-cache/src/services/view-syncer/cvr-error-state.pg.test.ts b/packages/zero-cache/src/services/view-syncer/cvr-error-state.pg.test.ts new file mode 100644 index 0000000000..fa7baf2ef4 --- /dev/null +++ b/packages/zero-cache/src/services/view-syncer/cvr-error-state.pg.test.ts @@ -0,0 +1,375 @@ +import {beforeEach, describe, expect, vi, type Mock} from 'vitest'; +import {createSilentLogContext} from '../../../../shared/src/logging-test-utils.ts'; +import {test, type PgTest} from '../../test/db.ts'; +import type {PostgresDB} from '../../types/pg.ts'; +import {upstreamSchema} from '../../types/shards.ts'; +import {id} from '../../types/sql.ts'; +import {getMutationsTableDefinition} from '../change-source/pg/schema/shard.ts'; +import {CVRStore} from './cvr-store.ts'; +import {CVRConfigDrivenUpdater, CVRQueryDrivenUpdater} from './cvr.ts'; +import {setupCVRTables} from './schema/cvr.ts'; +import {type ClientQueryRecord} from './schema/types.ts'; +import {ttlClockFromNumber} from './ttl-clock.ts'; + +const APP_ID = 'roze'; +const SHARD_NUM = 1; +const SHARD = {appID: APP_ID, shardNum: SHARD_NUM}; + +describe('view-syncer/cvr-error-state', () => { + const lc = createSilentLogContext(); + let db: PostgresDB; + let upstreamDb: PostgresDB; + let store: CVRStore; + let setTimeoutFn: Mock; + + const TASK_ID = 'my-task'; + const CVR_ID = 'my-cvr'; + const CONNECT_TIME = Date.UTC(2024, 10, 22); + const ON_FAILURE = (e: unknown) => { + throw e; + }; + + beforeEach(async ({testDBs}) => { + [db, upstreamDb] = await Promise.all([ + testDBs.create('view_syncer_cvr_error_schema'), + testDBs.create('view_syncer_cvr_error_upstream'), + ]); + const shard = id(upstreamSchema(SHARD)); + await upstreamDb.begin(tx => + tx.unsafe(` + CREATE SCHEMA IF NOT EXISTS ${shard}; + ${getMutationsTableDefinition(shard)} + `), + ); + await db.begin(tx => setupCVRTables(lc, tx, SHARD)); + + // Initialize CVR + await db.unsafe(` + INSERT INTO "roze_1/cvr".instances ("clientGroupID", version, "lastActive", "ttlClock", "replicaVersion") + VALUES('${CVR_ID}', '01', '2024-09-04T00:00:00Z', + (EXTRACT(EPOCH FROM TIMESTAMPTZ '2024-09-04T00:00:00Z') * 1000)::BIGINT, '01'); + INSERT INTO "roze_1/cvr"."rowsVersion" ("clientGroupID", version) + VALUES('${CVR_ID}', '01'); + `); + + setTimeoutFn = vi.fn(); + store = new CVRStore( + lc, + db, + upstreamDb, + SHARD, + TASK_ID, + CVR_ID, + ON_FAILURE, + 10, + 5, + 100, + setTimeoutFn as unknown as typeof setTimeout, + ); + + return () => testDBs.drop(db, upstreamDb); + }); + + test('persist and load error state', async () => { + const cvr = await store.load(lc, CONNECT_TIME); + + // Ensure query exists using ConfigDrivenUpdater + const configUpdater = new CVRConfigDrivenUpdater(store, cvr, SHARD); + configUpdater.putDesiredQueries('client1', [ + {hash: 'q1', ast: {table: 'issues'}}, + ]); + + const {cvr: updatedCvr} = await configUpdater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + const updater = new CVRQueryDrivenUpdater(store, updatedCvr, '02', '01'); + + // Track a query with an error + updater.trackQueries( + lc, + [{id: 'q1', transformationHash: 'hash1', errorMessage: 'fail'}], + [], + ); + + await updater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + // Verify DB state + const queries = + await db`SELECT * FROM "roze_1/cvr".queries WHERE "queryHash" = 'q1'`; + expect(queries).toHaveLength(1); + expect(queries[0].errorMessage).toBe('fail'); + expect(queries[0].errorVersion).toBe('02'); + + // Load CVR and verify in-memory state + const cvr2 = await store.load(lc, CONNECT_TIME); + const q1 = cvr2.queries['q1']; + expect(q1.errorMessage).toBe('fail'); + expect(q1.errorVersion).toEqual({stateVersion: '02'}); + }); + + test('update existing query with error', async () => { + let cvr = await store.load(lc, CONNECT_TIME); + + // Ensure query exists + const configUpdater = new CVRConfigDrivenUpdater(store, cvr, SHARD); + configUpdater.putDesiredQueries('client1', [ + {hash: 'q1', ast: {table: 'issues'}}, + ]); + + const {cvr: updatedCvr} = await configUpdater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + let updater = new CVRQueryDrivenUpdater(store, updatedCvr, '02', '01'); + + // Initial success + updater.trackQueries(lc, [{id: 'q1', transformationHash: 'hash1'}], []); + await updater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + // Update with error + cvr = await store.load(lc, CONNECT_TIME); + updater = new CVRQueryDrivenUpdater(store, cvr, '03', '01'); + updater.trackQueries( + lc, + [{id: 'q1', transformationHash: 'hash1', errorMessage: 'fail'}], + [], + ); + await updater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + const queries = + await db`SELECT * FROM "roze_1/cvr".queries WHERE "queryHash" = 'q1'`; + expect(queries[0].errorMessage).toBe('fail'); + expect(queries[0].errorVersion).toBe('03'); + + // Update with same error (retry failed) + cvr = await store.load(lc, CONNECT_TIME); + updater = new CVRQueryDrivenUpdater(store, cvr, '04', '01'); + updater.trackQueries( + lc, + [{id: 'q1', transformationHash: 'hash1', errorMessage: 'fail'}], + [], + ); + await updater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + const queries2 = + await db`SELECT * FROM "roze_1/cvr".queries WHERE "queryHash" = 'q1'`; + expect(queries2[0].errorMessage).toBe('fail'); + expect(queries2[0].errorVersion).toBe('04'); + }); + + test('clear error state on success', async () => { + let cvr = await store.load(lc, CONNECT_TIME); + + // Ensure query exists + const configUpdater = new CVRConfigDrivenUpdater(store, cvr, SHARD); + configUpdater.putDesiredQueries('client1', [ + {hash: 'q1', ast: {table: 'issues'}}, + ]); + const {cvr: updatedCvr} = await configUpdater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + let updater = new CVRQueryDrivenUpdater(store, updatedCvr, '02', '01'); + + // Initial error + updater.trackQueries( + lc, + [{id: 'q1', transformationHash: 'hash1', errorMessage: 'fail'}], + [], + ); + await updater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + // Update with success (same hash) + cvr = await store.load(lc, CONNECT_TIME); + // Ensure query exists (again, because load creates new CVR) + const configUpdater2 = new CVRConfigDrivenUpdater(store, cvr, SHARD); + configUpdater2.putDesiredQueries('client1', [ + {hash: 'q1', ast: {table: 'issues'}}, + ]); + + updater = new CVRQueryDrivenUpdater(store, cvr, '03', '01'); + updater.trackQueries( + lc, + [{id: 'q1', transformationHash: 'hash1'}], // No error + [], + ); + await store.flush(lc, cvr.version, cvr, Date.now()); + + const queries = + await db`SELECT * FROM "roze_1/cvr".queries WHERE "queryHash" = 'q1'`; + expect(queries[0].errorMessage).toBeNull(); + // errorVersion should probably be preserved or cleared? + // The implementation clears it if errorMessage is null. + expect(queries[0].errorVersion).toBeNull(); + }); + + test('persist retryErrorVersion in desires', async () => { + const cvr = await store.load(lc, CONNECT_TIME); + + // Put desired query with retryErrorVersion + // Use ConfigDrivenUpdater to handle it properly + const configUpdater = new CVRConfigDrivenUpdater(store, cvr, SHARD); + configUpdater.putDesiredQueries('client1', [ + { + hash: 'q1', + ast: {table: 'issues'}, + retryErrorVersion: '01', + }, + ]); + + await configUpdater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + const desires = + await db`SELECT * FROM "roze_1/cvr".desires WHERE "queryHash" = 'q1'`; + expect(desires).toHaveLength(1); + expect(desires[0].retryErrorVersion).toBe('01'); + + // Load CVR and verify + const cvr2 = await store.load(lc, CONNECT_TIME); + const q1 = cvr2.queries['q1'] as ClientQueryRecord; + const clientState = q1.clientState['client1']; + expect(clientState.retryErrorVersion).toEqual({stateVersion: '01'}); + }); + + test('track error without transformationHash', async () => { + let cvr = await store.load(lc, CONNECT_TIME); + + // Ensure query exists + const configUpdater = new CVRConfigDrivenUpdater(store, cvr, SHARD); + configUpdater.putDesiredQueries('client1', [ + {hash: 'q1', ast: {table: 'issues'}}, + ]); + const {cvr: updatedCvr} = await configUpdater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + const updater = new CVRQueryDrivenUpdater(store, updatedCvr, '02', '01'); + + // Track a query with an error and no transformationHash + updater.trackQueries( + lc, + [{id: 'q1', transformationHash: undefined, errorMessage: 'fail'}], + [], + ); + + await updater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + // Verify DB state + const queries = + await db`SELECT * FROM "roze_1/cvr".queries WHERE "queryHash" = 'q1'`; + expect(queries).toHaveLength(1); + expect(queries[0].errorMessage).toBe('fail'); + expect(queries[0].errorVersion).toBe('02'); + // transformationHash should be null (it was never set) + expect(queries[0].transformationHash).toBeNull(); + }); + + test('retry query when errorVersion matches retryErrorVersion', async () => { + let cvr = await store.load(lc, CONNECT_TIME); + + // 1. Setup: Query in error state + const configUpdater = new CVRConfigDrivenUpdater(store, cvr, SHARD); + configUpdater.putDesiredQueries('client1', [ + {hash: 'q1', ast: {table: 'issues'}}, + ]); + const {cvr: cvr1} = await configUpdater.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + const updater1 = new CVRQueryDrivenUpdater(store, cvr1, '02', '01'); + updater1.trackQueries( + lc, + [{id: 'q1', transformationHash: undefined, errorMessage: 'fail'}], + [], + ); + const {cvr: cvr2} = await updater1.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + // Verify error state + let queries = + await db`SELECT * FROM "roze_1/cvr".queries WHERE "queryHash" = 'q1'`; + expect(queries[0].errorMessage).toBe('fail'); + expect(queries[0].errorVersion).toBe('02'); + + // 2. Client requests retry for version '02' + const configUpdater2 = new CVRConfigDrivenUpdater(store, cvr2, SHARD); + configUpdater2.putDesiredQueries('client1', [ + { + hash: 'q1', + ast: {table: 'issues'}, + retryErrorVersion: '02', + }, + ]); + const {cvr: cvr3} = await configUpdater2.flush( + lc, + CONNECT_TIME, + CONNECT_TIME, + ttlClockFromNumber(CONNECT_TIME), + ); + + // 3. Verify that ViewSyncer would see this as a retry + // We can't easily test ViewSyncer logic directly here without mocking, + // but we can verify the CVR state is correct for ViewSyncer to consume. + const q1 = cvr3.queries['q1']; + if (q1.type !== 'client') throw new Error('Expected client query'); + const clientState = q1.clientState['client1']; + expect(clientState.retryErrorVersion).toEqual({stateVersion: '02'}); + expect(q1.errorVersion).toEqual({stateVersion: '02'}); + // ViewSyncer logic: errorVersion === retryErrorVersion -> retry = true + }); +}); diff --git a/packages/zero-cache/src/services/view-syncer/cvr-purger.pg.test.ts b/packages/zero-cache/src/services/view-syncer/cvr-purger.pg.test.ts index 14bbc594ed..6715470874 100644 --- a/packages/zero-cache/src/services/view-syncer/cvr-purger.pg.test.ts +++ b/packages/zero-cache/src/services/view-syncer/cvr-purger.pg.test.ts @@ -134,6 +134,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -145,6 +147,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ diff --git a/packages/zero-cache/src/services/view-syncer/cvr-store.ts b/packages/zero-cache/src/services/view-syncer/cvr-store.ts index e2f5900613..de3e5ce4a3 100644 --- a/packages/zero-cache/src/services/view-syncer/cvr-store.ts +++ b/packages/zero-cache/src/services/view-syncer/cvr-store.ts @@ -89,6 +89,8 @@ function asQuery(row: QueriesRow): QueryRecord { clientState: {}, transformationHash: row.transformationHash ?? undefined, transformationVersion: maybeVersion(row.transformationVersion), + errorMessage: row.errorMessage ?? undefined, + errorVersion: maybeVersion(row.errorVersion), } satisfies CustomQueryRecord; } @@ -100,6 +102,8 @@ function asQuery(row: QueriesRow): QueryRecord { ast, transformationHash: row.transformationHash ?? undefined, transformationVersion: maybeVersion(row.transformationVersion), + errorMessage: row.errorMessage ?? undefined, + errorVersion: maybeVersion(row.errorVersion), } satisfies InternalQueryRecord) : ({ type: 'client', @@ -109,6 +113,8 @@ function asQuery(row: QueriesRow): QueryRecord { clientState: {}, transformationHash: row.transformationHash ?? undefined, transformationVersion: maybeVersion(row.transformationVersion), + errorMessage: row.errorMessage ?? undefined, + errorVersion: maybeVersion(row.errorVersion), } satisfies ClientQueryRecord); } @@ -261,7 +267,8 @@ export class CVRStore { "patchVersion", "deleted", "ttlMs" AS "ttl", - "inactivatedAtMs" AS "inactivatedAt" + "inactivatedAtMs" AS "inactivatedAt", + "retryErrorVersion" FROM ${this.#cvr('desires')} WHERE "clientGroupID" = ${id}`, ]); @@ -364,6 +371,10 @@ export class CVRStore { inactivatedAt: row.inactivatedAt ?? undefined, ttl: clampTTL(row.ttl ?? DEFAULT_TTL_MS), version: versionFromString(row.patchVersion), + retryErrorVersion: + row.retryErrorVersion === null + ? undefined + : versionFromString(row.retryErrorVersion), }; } } @@ -475,6 +486,8 @@ export class CVRStore { deleted: true, transformationHash: null, transformationVersion: null, + errorMessage: null, + errorVersion: null, })} WHERE "clientGroupID" = ${this.#id} AND "queryHash" = ${queryPatch.id}`, }); @@ -498,7 +511,9 @@ export class CVRStore { "transformationHash", "transformationVersion", "internal", - "deleted" + "deleted", + "errorMessage", + "errorVersion" ) VALUES ( ${change.clientGroupID}, ${change.queryHash}, @@ -509,7 +524,9 @@ export class CVRStore { ${change.transformationHash ?? null}, ${change.transformationVersion ?? null}, ${change.internal}, - ${change.deleted ?? false} + ${change.deleted ?? false}, + ${change.errorMessage ?? null}, + ${change.errorVersion ?? null} ) ON CONFLICT ("clientGroupID", "queryHash") DO UPDATE SET @@ -520,7 +537,9 @@ export class CVRStore { "transformationHash" = ${change.transformationHash ?? null}, "transformationVersion" = ${change.transformationVersion ?? null}, "internal" = ${change.internal}, - "deleted" = ${change.deleted ?? false}`, + "deleted" = ${change.deleted ?? false}, + "errorMessage" = ${change.errorMessage ?? null}, + "errorVersion" = ${change.errorVersion ?? null}`, }); } @@ -534,6 +553,8 @@ export class CVRStore { | 'transformationHash' | 'transformationVersion' | 'deleted' + | 'errorMessage' + | 'errorVersion' > = { patchVersion: query.type === 'internal' @@ -542,6 +563,8 @@ export class CVRStore { transformationHash: query.transformationHash ?? null, transformationVersion: maybeVersionString(query.transformationVersion), deleted: false, + errorMessage: query.errorMessage ?? null, + errorVersion: maybeVersionString(query.errorVersion), }; this.#writes.add({ @@ -586,6 +609,7 @@ export class CVRStore { deleted: boolean, inactivatedAt: TTLClock | undefined, ttl: number, + retryErrorVersion?: CVRVersion | undefined, ): void { const change: DesiresRow = { clientGroupID: this.#id, @@ -597,6 +621,9 @@ export class CVRStore { // ttl is in ms in JavaScript ttl: ttl < 0 ? null : ttl, + retryErrorVersion: retryErrorVersion + ? versionString(retryErrorVersion) + : null, }; // For backward compatibility during rollout, write to both old and new columns: @@ -615,11 +642,11 @@ export class CVRStore { write: tx => tx` INSERT INTO ${this.#cvr('desires')} ( "clientGroupID", "clientID", "queryHash", "patchVersion", "deleted", - "ttl", "ttlMs", "inactivatedAt", "inactivatedAtMs" + "ttl", "ttlMs", "inactivatedAt", "inactivatedAtMs", "retryErrorVersion" ) VALUES ( ${change.clientGroupID}, ${change.clientID}, ${change.queryHash}, ${change.patchVersion}, ${change.deleted}, ${ttlInterval}, ${ttlMs}, - ${inactivatedAtTimestamp}, ${inactivatedAtMs} + ${inactivatedAtTimestamp}, ${inactivatedAtMs}, ${change.retryErrorVersion} ) ON CONFLICT ("clientGroupID", "clientID", "queryHash") DO UPDATE SET @@ -628,7 +655,8 @@ export class CVRStore { "ttl" = ${ttlInterval}, "ttlMs" = ${ttlMs}, "inactivatedAt" = ${inactivatedAtTimestamp}, - "inactivatedAtMs" = ${inactivatedAtMs} + "inactivatedAtMs" = ${inactivatedAtMs}, + "retryErrorVersion" = ${change.retryErrorVersion} `, }); } @@ -783,20 +811,20 @@ export class CVRStore { this.putInstance(cvr); const rowsFlushed = await this.#db.begin(Mode.READ_COMMITTED, async tx => { - const pipelined: Promise[] = [ - // #checkVersionAndOwnership() executes a `SELECT ... FOR UPDATE` - // query to acquire a row-level lock so that version-updating - // transactions are effectively serialized per cvr.instance. - // - // Note that `rowsVersion` updates, on the other hand, are not subject - // to this lock and can thus commit / be-committed independently of - // cvr.instances. - this.#checkVersionAndOwnership( - tx, - expectedCurrentVersion, - lastConnectTime, - ), - ]; + // #checkVersionAndOwnership() executes a `SELECT ... FOR UPDATE` + // query to acquire a row-level lock so that version-updating + // transactions are effectively serialized per cvr.instance. + // + // Note that `rowsVersion` updates, on the other hand, are not subject + // to this lock and can thus commit / be-committed independently of + // cvr.instances. + await this.#checkVersionAndOwnership( + tx, + expectedCurrentVersion, + lastConnectTime, + ); + + const pipelined: Promise[] = []; for (const write of this.#writes) { stats.instances += write.stats.instances ?? 0; diff --git a/packages/zero-cache/src/services/view-syncer/cvr.pg.test.ts b/packages/zero-cache/src/services/view-syncer/cvr.pg.test.ts index db0859649c..13c0e48b17 100644 --- a/packages/zero-cache/src/services/view-syncer/cvr.pg.test.ts +++ b/packages/zero-cache/src/services/view-syncer/cvr.pg.test.ts @@ -120,7 +120,8 @@ describe('view-syncer/cvr', () => { "patchVersion", "deleted", "ttlMs" AS "ttl", - "inactivatedAtMs" AS "inactivatedAt" + "inactivatedAtMs" AS "inactivatedAt", + "retryErrorVersion" FROM ${db(`${cvrSchema(SHARD)}.` + table)}`), ]; } else { @@ -182,7 +183,8 @@ describe('view-syncer/cvr', () => { "patchVersion", "deleted", "ttlMs" AS "ttl", - "inactivatedAtMs" AS "inactivatedAt" + "inactivatedAtMs" AS "inactivatedAt", + "retryErrorVersion" FROM ${db('dapp_3/cvr.desires')} ORDER BY "clientGroupID", "clientID", "queryHash"`, db`SELECT * FROM ${db('dapp_3/cvr.rows')} ORDER BY "clientGroupID", "schema", "table", "rowKey"`, @@ -494,6 +496,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1a9:02', internal: null, deleted: false, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -505,6 +509,7 @@ describe('view-syncer/cvr', () => { deleted: false, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [], @@ -598,6 +603,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1a9:02', internal: null, deleted: false, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -609,6 +616,7 @@ describe('view-syncer/cvr', () => { deleted: false, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [], @@ -839,6 +847,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1a9:02', internal: null, deleted: false, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -850,6 +860,7 @@ describe('view-syncer/cvr', () => { deleted: false, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -859,6 +870,7 @@ describe('view-syncer/cvr', () => { deleted: false, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [], @@ -1265,6 +1277,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: false, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: null, queryHash: 'fourHash', transformationHash: null, @@ -1275,6 +1289,8 @@ describe('view-syncer/cvr', () => { clientGroupID: 'abc123', deleted: false, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: null, queryArgs: [], queryHash: 'xCustomHash', @@ -1308,6 +1324,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: false, internal: true, + errorMessage: null, + errorVersion: null, patchVersion: null, queryHash: 'lmids', transformationHash: null, @@ -1343,6 +1361,8 @@ describe('view-syncer/cvr', () => { clientGroupID: 'abc123', deleted: false, internal: true, + errorMessage: null, + errorVersion: null, patchVersion: null, queryArgs: null, queryHash: 'mutationResults', @@ -1359,6 +1379,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: false, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: null, queryHash: 'threeHash', transformationHash: null, @@ -1373,6 +1395,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: false, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '1a9:02', queryHash: 'oneHash', transformationHash: 'twoHash', @@ -1388,6 +1412,7 @@ describe('view-syncer/cvr', () => { queryHash: 'oneHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -1397,6 +1422,7 @@ describe('view-syncer/cvr', () => { queryHash: 'fourHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -1406,6 +1432,7 @@ describe('view-syncer/cvr', () => { patchVersion: '1aa:01', queryHash: 'xCustomHash', ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -1415,6 +1442,7 @@ describe('view-syncer/cvr', () => { queryHash: 'threeHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -1424,6 +1452,7 @@ describe('view-syncer/cvr', () => { queryHash: 'oneHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -1433,6 +1462,7 @@ describe('view-syncer/cvr', () => { patchVersion: '1aa:01', queryHash: 'xCustomHash', ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -1442,6 +1472,7 @@ describe('view-syncer/cvr', () => { queryHash: 'threeHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -1451,6 +1482,7 @@ describe('view-syncer/cvr', () => { queryHash: 'oneHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], @@ -1533,6 +1565,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1a9:02', deleted: false, internal: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -1544,6 +1578,7 @@ describe('view-syncer/cvr', () => { deleted: false, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [], @@ -1658,6 +1693,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -1670,6 +1707,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, // Already in CVRs from "189" + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -1682,6 +1721,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -1693,6 +1734,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -2010,6 +2052,8 @@ describe('view-syncer/cvr', () => { clientGroupID: 'abc123', deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '189', queryHash: 'already-deleted', transformationHash: null, @@ -2024,6 +2068,8 @@ describe('view-syncer/cvr', () => { clientGroupID: 'abc123', deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '19z', queryHash: 'catchup-delete', transformationHash: null, @@ -2038,6 +2084,8 @@ describe('view-syncer/cvr', () => { clientGroupID: 'abc123', deleted: false, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '1aa:01', queryHash: 'oneHash', transformationHash: 'serverOneHash', @@ -2053,6 +2101,7 @@ describe('view-syncer/cvr', () => { queryHash: 'oneHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -2144,6 +2193,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1aa:01', internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -2156,6 +2207,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, // Already in CVRs from "189" + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -2168,6 +2221,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -2179,6 +2234,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -2252,7 +2308,12 @@ describe('view-syncer/cvr', () => { [], ); expect(newVersion).toEqual({stateVersion: '1ba', minorVersion: 1}); - expect(queryPatches).toHaveLength(0); + // When transformationHash changes for an already-gotten query, a put patch is returned + expect(queryPatches).toHaveLength(1); + expect(queryPatches[0]).toEqual({ + patch: {type: 'query', op: 'put', id: 'oneHash'}, + toVersion: {stateVersion: '1ba', minorVersion: 1}, + }); expect( await updater.received( @@ -2466,6 +2527,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '189', queryHash: 'already-deleted', transformationHash: null, @@ -2480,6 +2543,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '19z', queryHash: 'catchup-delete', transformationHash: null, @@ -2494,6 +2559,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: false, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '1aa:01', queryHash: 'oneHash', transformationHash: 'serverTwoHash', @@ -2509,6 +2576,7 @@ describe('view-syncer/cvr', () => { queryHash: 'oneHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -2573,7 +2641,12 @@ describe('view-syncer/cvr', () => { [], )); expect(newVersion).toEqual({stateVersion: '1ba', minorVersion: 2}); - expect(queryPatches).toHaveLength(0); + // When transformationHash changes for an already-gotten query, a put patch is returned + expect(queryPatches).toHaveLength(1); + expect(queryPatches[0]).toEqual({ + patch: {type: 'query', op: 'put', id: 'oneHash'}, + toVersion: {stateVersion: '1ba', minorVersion: 2}, + }); ({cvr: updated, flushed} = await updater.flush( lc, @@ -2622,6 +2695,8 @@ describe('view-syncer/cvr', () => { queryArgs: null, deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '189', queryHash: 'already-deleted', transformationHash: null, @@ -2636,6 +2711,8 @@ describe('view-syncer/cvr', () => { queryArgs: null, deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '19z', queryHash: 'catchup-delete', transformationHash: null, @@ -2650,6 +2727,8 @@ describe('view-syncer/cvr', () => { queryArgs: null, deleted: false, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '1aa:01', queryHash: 'oneHash', transformationHash: 'newXFormHash', @@ -2689,6 +2768,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1aa:01', internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -2701,6 +2782,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1aa:01', internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -2713,6 +2796,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -2725,6 +2810,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -2736,6 +2823,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -2745,6 +2833,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -2822,7 +2911,16 @@ describe('view-syncer/cvr', () => { [], ); expect(newVersion).toEqual({stateVersion: '1ba', minorVersion: 1}); - expect(queryPatches).toHaveLength(0); + // When transformationHash changes for already-gotten queries, put patches are returned + expect(queryPatches).toHaveLength(2); + expect(queryPatches[0]).toEqual({ + patch: {type: 'query', op: 'put', id: 'oneHash'}, + toVersion: {stateVersion: '1ba', minorVersion: 1}, + }); + expect(queryPatches[1]).toEqual({ + patch: {type: 'query', op: 'put', id: 'twoHash'}, + toVersion: {stateVersion: '1ba', minorVersion: 1}, + }); expect( await updater.received( @@ -3091,6 +3189,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '189', queryHash: 'already-deleted', transformationHash: null, @@ -3105,6 +3205,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '19z', queryHash: 'catchup-delete', transformationHash: null, @@ -3119,6 +3221,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: false, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '1aa:01', queryHash: 'oneHash', transformationHash: 'updatedServerOneHash', @@ -3133,6 +3237,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: false, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '1aa:01', queryHash: 'twoHash', transformationHash: 'updatedServerTwoHash', @@ -3148,6 +3254,7 @@ describe('view-syncer/cvr', () => { queryHash: 'oneHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -3157,6 +3264,7 @@ describe('view-syncer/cvr', () => { queryHash: 'twoHash', inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -3240,6 +3348,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1aa:01', internal: null, deleted: false, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -3252,6 +3362,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -3264,6 +3376,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, + errorMessage: null, + errorVersion: null, }, ], desires: [], @@ -3501,6 +3615,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '189', queryHash: 'already-deleted', transformationHash: null, @@ -3515,6 +3631,8 @@ describe('view-syncer/cvr', () => { queryName: null, deleted: true, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '19z', queryHash: 'catchup-delete', transformationHash: null, @@ -3529,6 +3647,8 @@ describe('view-syncer/cvr', () => { queryArgs: null, queryName: null, internal: null, + errorMessage: null, + errorVersion: null, patchVersion: '1ba:01', queryHash: 'oneHash', transformationHash: null, @@ -3620,6 +3740,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1aa:01', internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -3632,6 +3754,8 @@ describe('view-syncer/cvr', () => { patchVersion: '1aa:01', internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -3644,6 +3768,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'abc123', @@ -3656,6 +3782,8 @@ describe('view-syncer/cvr', () => { transformationVersion: null, internal: null, deleted: true, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -3667,6 +3795,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -3676,6 +3805,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -3764,6 +3894,7 @@ describe('view-syncer/cvr', () => { "clientState": { "fooClient": { "inactivatedAt": undefined, + "retryErrorVersion": undefined, "ttl": 300000, "version": { "minorVersion": 1, @@ -3771,6 +3902,8 @@ describe('view-syncer/cvr', () => { }, }, }, + "errorMessage": undefined, + "errorVersion": undefined, "id": "oneHash", "patchVersion": { "minorVersion": 1, @@ -3789,6 +3922,7 @@ describe('view-syncer/cvr', () => { "clientState": { "fooClient": { "inactivatedAt": undefined, + "retryErrorVersion": undefined, "ttl": 300000, "version": { "minorVersion": 1, @@ -3796,6 +3930,8 @@ describe('view-syncer/cvr', () => { }, }, }, + "errorMessage": undefined, + "errorVersion": undefined, "id": "twoHash", "patchVersion": { "minorVersion": 1, @@ -4053,6 +4189,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -4064,6 +4202,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -4204,6 +4343,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -4215,6 +4356,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -4270,6 +4412,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -4281,6 +4425,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -4414,6 +4559,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -4425,6 +4572,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -4480,6 +4628,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -4491,6 +4641,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -4685,6 +4836,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -4696,6 +4849,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -4751,6 +4905,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -4762,6 +4918,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -4908,6 +5065,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -4919,6 +5078,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -5073,6 +5233,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -5084,6 +5246,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -5121,47 +5284,50 @@ describe('view-syncer/cvr', () => { ); const cvr = await cvrStore.load(lc, LAST_CONNECT); expect(cvr).toMatchInlineSnapshot(` - { - "clientSchema": null, - "clients": { - "fooClient": { - "desiredQueryIDs": [ - "oneHash", - ], - "id": "fooClient", - }, - }, - "id": "abc123", - "lastActive": 1742256000000, - "queries": { - "oneHash": { - "ast": { - "table": "issues", + { + "clientSchema": null, + "clients": { + "fooClient": { + "desiredQueryIDs": [ + "oneHash", + ], + "id": "fooClient", }, - "clientState": { - "fooClient": { - "inactivatedAt": undefined, - "ttl": 300000, - "version": { - "minorVersion": 1, - "stateVersion": "1a9", + }, + "id": "abc123", + "lastActive": 1742256000000, + "queries": { + "oneHash": { + "ast": { + "table": "issues", + }, + "clientState": { + "fooClient": { + "inactivatedAt": undefined, + "retryErrorVersion": undefined, + "ttl": 300000, + "version": { + "minorVersion": 1, + "stateVersion": "1a9", + }, }, }, + "errorMessage": undefined, + "errorVersion": undefined, + "id": "oneHash", + "patchVersion": undefined, + "transformationHash": undefined, + "transformationVersion": undefined, + "type": "client", }, - "id": "oneHash", - "patchVersion": undefined, - "transformationHash": undefined, - "transformationVersion": undefined, - "type": "client", }, - }, - "replicaVersion": "120", - "ttlClock": 1742256000000, - "version": { - "stateVersion": "1aa", - }, - } - `); + "replicaVersion": "120", + "ttlClock": 1742256000000, + "version": { + "stateVersion": "1aa", + }, + } + `); const updater = new CVRConfigDrivenUpdater(cvrStore, cvr, SHARD); updater.markDesiredQueriesAsInactive('fooClient', ['oneHash'], ttlClock); @@ -5247,6 +5413,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -5258,6 +5426,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl, + retryErrorVersion: null, }, ], rows: [ @@ -5383,6 +5552,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -5394,6 +5565,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -5431,50 +5603,53 @@ describe('view-syncer/cvr', () => { ); const cvr = await cvrStore.load(lc, LAST_CONNECT); expect(cvr).toMatchInlineSnapshot(` - { - "clientSchema": null, - "clients": { - "fooClient": { - "desiredQueryIDs": [ - "oneHash", - ], - "id": "fooClient", - }, - }, - "id": "abc123", - "lastActive": 1742256000000, - "queries": { - "oneHash": { - "ast": { - "table": "issues", + { + "clientSchema": null, + "clients": { + "fooClient": { + "desiredQueryIDs": [ + "oneHash", + ], + "id": "fooClient", }, - "clientState": { - "fooClient": { - "inactivatedAt": undefined, - "ttl": 300000, - "version": { - "minorVersion": 1, - "stateVersion": "1a9", + }, + "id": "abc123", + "lastActive": 1742256000000, + "queries": { + "oneHash": { + "ast": { + "table": "issues", + }, + "clientState": { + "fooClient": { + "inactivatedAt": undefined, + "retryErrorVersion": undefined, + "ttl": 300000, + "version": { + "minorVersion": 1, + "stateVersion": "1a9", + }, }, }, + "errorMessage": undefined, + "errorVersion": undefined, + "id": "oneHash", + "patchVersion": undefined, + "transformationHash": "oneHashTransformed", + "transformationVersion": { + "minorVersion": 1, + "stateVersion": "1a9", + }, + "type": "client", }, - "id": "oneHash", - "patchVersion": undefined, - "transformationHash": "oneHashTransformed", - "transformationVersion": { - "minorVersion": 1, - "stateVersion": "1a9", - }, - "type": "client", }, - }, - "replicaVersion": "120", - "ttlClock": 1742256000000, - "version": { - "stateVersion": "1aa", - }, - } - `); + "replicaVersion": "120", + "ttlClock": 1742256000000, + "version": { + "stateVersion": "1aa", + }, + } + `); const updater = new CVRConfigDrivenUpdater(cvrStore, cvr, SHARD); expect( @@ -5584,6 +5759,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -5595,6 +5772,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: 10, + retryErrorVersion: null, }, ], rows: [], @@ -5624,6 +5802,7 @@ describe('view-syncer/cvr', () => { .toMatchInlineSnapshot(` { "inactivatedAt": undefined, + "retryErrorVersion": undefined, "ttl": 10, "version": { "minorVersion": 1, @@ -5662,15 +5841,16 @@ describe('view-syncer/cvr', () => { expect( (updated.queries.oneHash as ClientQueryRecord).clientState.fooClient, ).toMatchInlineSnapshot(` - { - "inactivatedAt": undefined, - "ttl": 600000, - "version": { - "minorVersion": 1, - "stateVersion": "1aa", - }, - } - `); + { + "inactivatedAt": undefined, + "retryErrorVersion": undefined, + "ttl": 600000, + "version": { + "minorVersion": 1, + "stateVersion": "1aa", + }, + } + `); }); }); @@ -5713,6 +5893,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -5724,6 +5906,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -5733,6 +5916,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -5742,6 +5926,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -5807,70 +5992,74 @@ describe('view-syncer/cvr', () => { ttlClock, ); expect(updated).toMatchInlineSnapshot(` - { - "clientSchema": null, - "clients": { - "client-a": { - "desiredQueryIDs": [ - "oneHash", - ], - "id": "client-a", - }, - "client-c": { - "desiredQueryIDs": [ - "oneHash", - ], - "id": "client-c", - }, - }, - "id": "abc123", - "lastActive": 1709683200000, - "queries": { - "oneHash": { - "ast": { - "table": "issues", - }, - "clientState": { - "client-a": { - "inactivatedAt": undefined, - "ttl": 300000, - "version": { - "minorVersion": 1, - "stateVersion": "1a9", + { + "clientSchema": null, + "clients": { + "client-a": { + "desiredQueryIDs": [ + "oneHash", + ], + "id": "client-a", }, - }, - "client-b": { - "inactivatedAt": 1709683200000, - "ttl": 300000, - "version": { - "minorVersion": 1, - "stateVersion": "1aa", + "client-c": { + "desiredQueryIDs": [ + "oneHash", + ], + "id": "client-c", }, }, - "client-c": { - "inactivatedAt": undefined, - "ttl": 300000, - "version": { - "minorVersion": 1, - "stateVersion": "1a9", + "id": "abc123", + "lastActive": 1709683200000, + "queries": { + "oneHash": { + "ast": { + "table": "issues", + }, + "clientState": { + "client-a": { + "inactivatedAt": undefined, + "retryErrorVersion": undefined, + "ttl": 300000, + "version": { + "minorVersion": 1, + "stateVersion": "1a9", + }, + }, + "client-b": { + "inactivatedAt": 1709683200000, + "ttl": 300000, + "version": { + "minorVersion": 1, + "stateVersion": "1aa", + }, + }, + "client-c": { + "inactivatedAt": undefined, + "retryErrorVersion": undefined, + "ttl": 300000, + "version": { + "minorVersion": 1, + "stateVersion": "1a9", + }, + }, + }, + "errorMessage": undefined, + "errorVersion": undefined, + "id": "oneHash", + "patchVersion": undefined, + "transformationHash": undefined, + "transformationVersion": undefined, + "type": "client", }, }, - }, - "id": "oneHash", - "patchVersion": undefined, - "transformationHash": undefined, - "transformationVersion": undefined, - "type": "client", - }, - }, - "replicaVersion": "120", - "ttlClock": 1709683200000, - "version": { - "minorVersion": 1, - "stateVersion": "1aa", - }, - } - `); + "replicaVersion": "120", + "ttlClock": 1709683200000, + "version": { + "minorVersion": 1, + "stateVersion": "1aa", + }, + } + `); expect(flushed).toMatchInlineSnapshot(` { "clients": 1, @@ -5903,6 +6092,7 @@ describe('view-syncer/cvr', () => { "inactivatedAt": null, "patchVersion": "1a9:01", "queryHash": "oneHash", + "retryErrorVersion": null, "ttl": 300000, }, { @@ -5912,6 +6102,7 @@ describe('view-syncer/cvr', () => { "inactivatedAt": 1709683200000, "patchVersion": "1aa:01", "queryHash": "oneHash", + "retryErrorVersion": null, "ttl": 300000, }, { @@ -5921,6 +6112,7 @@ describe('view-syncer/cvr', () => { "inactivatedAt": null, "patchVersion": "1a9:01", "queryHash": "oneHash", + "retryErrorVersion": null, "ttl": 300000, }, ], @@ -5943,6 +6135,8 @@ describe('view-syncer/cvr', () => { }, "clientGroupID": "abc123", "deleted": false, + "errorMessage": null, + "errorVersion": null, "internal": null, "patchVersion": null, "queryArgs": null, @@ -6031,6 +6225,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, { clientGroupID: 'def456', @@ -6043,6 +6239,8 @@ describe('view-syncer/cvr', () => { patchVersion: null, internal: null, deleted: null, + errorMessage: null, + errorVersion: null, }, ], desires: [ @@ -6054,6 +6252,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'def456', @@ -6063,6 +6262,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, { clientGroupID: 'abc123', @@ -6072,6 +6272,7 @@ describe('view-syncer/cvr', () => { deleted: null, inactivatedAt: null, ttl: DEFAULT_TTL_MS, + retryErrorVersion: null, }, ], rows: [ @@ -6154,6 +6355,7 @@ describe('view-syncer/cvr', () => { "clientState": { "client-a": { "inactivatedAt": undefined, + "retryErrorVersion": undefined, "ttl": 300000, "version": { "minorVersion": 1, @@ -6162,6 +6364,7 @@ describe('view-syncer/cvr', () => { }, "client-c": { "inactivatedAt": undefined, + "retryErrorVersion": undefined, "ttl": 300000, "version": { "minorVersion": 1, @@ -6169,6 +6372,8 @@ describe('view-syncer/cvr', () => { }, }, }, + "errorMessage": undefined, + "errorVersion": undefined, "id": "oneHash", "patchVersion": undefined, "transformationHash": undefined, @@ -6199,158 +6404,165 @@ describe('view-syncer/cvr', () => { ); expect(await getAllState(cvrDb)).toMatchInlineSnapshot(` - { - "clients": Result [ - { - "clientGroupID": "abc123", - "clientID": "client-a", - }, - { - "clientGroupID": "abc123", - "clientID": "client-c", - }, - { - "clientGroupID": "def456", - "clientID": "client-b", - }, - ], - "desires": Result [ - { - "clientGroupID": "abc123", - "clientID": "client-a", - "deleted": null, - "inactivatedAt": null, - "patchVersion": "1a9:01", - "queryHash": "oneHash", - "ttl": 300000, - }, - { - "clientGroupID": "abc123", - "clientID": "client-c", - "deleted": null, - "inactivatedAt": null, - "patchVersion": "1a9:01", - "queryHash": "oneHash", - "ttl": 300000, - }, - { - "clientGroupID": "def456", - "clientID": "client-b", - "deleted": null, - "inactivatedAt": null, - "patchVersion": "1a9:01", - "queryHash": "oneHash", - "ttl": 300000, - }, - ], - "instances": Result [ - { - "clientGroupID": "abc123", - "clientSchema": null, - "grantedAt": 1709251200000, - "lastActive": 1713830400000, - "owner": "my-task", - "replicaVersion": "120", - "ttlClock": 1713830400000, - "version": "1aa", - }, - { - "clientGroupID": "def456", - "clientSchema": null, - "grantedAt": null, - "lastActive": 1713830400000, - "owner": null, - "replicaVersion": "120", - "ttlClock": 1713830400000, - "version": "1aa", - }, - ], - "queries": Result [ - { - "clientAST": { - "table": "issues", + { + "clients": Result [ + { + "clientGroupID": "abc123", + "clientID": "client-a", }, - "clientGroupID": "abc123", - "deleted": null, - "internal": null, - "patchVersion": null, - "queryArgs": null, - "queryHash": "oneHash", - "queryName": null, - "transformationHash": null, - "transformationVersion": null, - }, - { - "clientAST": { - "table": "issues", + { + "clientGroupID": "abc123", + "clientID": "client-c", }, - "clientGroupID": "def456", - "deleted": null, - "internal": null, - "patchVersion": null, - "queryArgs": null, - "queryHash": "oneHash", - "queryName": null, - "transformationHash": null, - "transformationVersion": null, - }, - ], - "rows": Result [ - { - "clientGroupID": "abc123", - "patchVersion": "1a0", - "refCounts": { - "oneHash": 2, + { + "clientGroupID": "def456", + "clientID": "client-b", }, - "rowKey": { - "id": "123", + ], + "desires": Result [ + { + "clientGroupID": "abc123", + "clientID": "client-a", + "deleted": null, + "inactivatedAt": null, + "patchVersion": "1a9:01", + "queryHash": "oneHash", + "retryErrorVersion": null, + "ttl": 300000, }, - "rowVersion": "03", - "schema": "public", - "table": "issues", - }, - { - "clientGroupID": "abc123", - "patchVersion": "1a0", - "refCounts": { - "oneHash": 2, + { + "clientGroupID": "abc123", + "clientID": "client-c", + "deleted": null, + "inactivatedAt": null, + "patchVersion": "1a9:01", + "queryHash": "oneHash", + "retryErrorVersion": null, + "ttl": 300000, }, - "rowKey": { - "id": "321", + { + "clientGroupID": "def456", + "clientID": "client-b", + "deleted": null, + "inactivatedAt": null, + "patchVersion": "1a9:01", + "queryHash": "oneHash", + "retryErrorVersion": null, + "ttl": 300000, }, - "rowVersion": "03", - "schema": "public", - "table": "issues", - }, - { - "clientGroupID": "def456", - "patchVersion": "1a0", - "refCounts": { - "oneHash": 1, + ], + "instances": Result [ + { + "clientGroupID": "abc123", + "clientSchema": null, + "grantedAt": 1709251200000, + "lastActive": 1713830400000, + "owner": "my-task", + "replicaVersion": "120", + "ttlClock": 1713830400000, + "version": "1aa", }, - "rowKey": { - "id": "123", + { + "clientGroupID": "def456", + "clientSchema": null, + "grantedAt": null, + "lastActive": 1713830400000, + "owner": null, + "replicaVersion": "120", + "ttlClock": 1713830400000, + "version": "1aa", }, - "rowVersion": "03", - "schema": "public", - "table": "issues", - }, - { - "clientGroupID": "def456", - "patchVersion": "1a0", - "refCounts": { - "oneHash": 1, + ], + "queries": Result [ + { + "clientAST": { + "table": "issues", + }, + "clientGroupID": "abc123", + "deleted": null, + "errorMessage": null, + "errorVersion": null, + "internal": null, + "patchVersion": null, + "queryArgs": null, + "queryHash": "oneHash", + "queryName": null, + "transformationHash": null, + "transformationVersion": null, }, - "rowKey": { - "id": "321", + { + "clientAST": { + "table": "issues", + }, + "clientGroupID": "def456", + "deleted": null, + "errorMessage": null, + "errorVersion": null, + "internal": null, + "patchVersion": null, + "queryArgs": null, + "queryHash": "oneHash", + "queryName": null, + "transformationHash": null, + "transformationVersion": null, }, - "rowVersion": "03", - "schema": "public", - "table": "issues", - }, - ], - } - `); + ], + "rows": Result [ + { + "clientGroupID": "abc123", + "patchVersion": "1a0", + "refCounts": { + "oneHash": 2, + }, + "rowKey": { + "id": "123", + }, + "rowVersion": "03", + "schema": "public", + "table": "issues", + }, + { + "clientGroupID": "abc123", + "patchVersion": "1a0", + "refCounts": { + "oneHash": 2, + }, + "rowKey": { + "id": "321", + }, + "rowVersion": "03", + "schema": "public", + "table": "issues", + }, + { + "clientGroupID": "def456", + "patchVersion": "1a0", + "refCounts": { + "oneHash": 1, + }, + "rowKey": { + "id": "123", + }, + "rowVersion": "03", + "schema": "public", + "table": "issues", + }, + { + "clientGroupID": "def456", + "patchVersion": "1a0", + "refCounts": { + "oneHash": 1, + }, + "rowKey": { + "id": "321", + }, + "rowVersion": "03", + "schema": "public", + "table": "issues", + }, + ], + } + `); // No-op expect(flushed).toBe(false); @@ -6379,6 +6591,7 @@ describe('view-syncer/cvr', () => { "inactivatedAt": null, "patchVersion": "1a9:01", "queryHash": "oneHash", + "retryErrorVersion": null, "ttl": 300000, }, { @@ -6388,6 +6601,7 @@ describe('view-syncer/cvr', () => { "inactivatedAt": null, "patchVersion": "1a9:01", "queryHash": "oneHash", + "retryErrorVersion": null, "ttl": 300000, }, { @@ -6397,6 +6611,7 @@ describe('view-syncer/cvr', () => { "inactivatedAt": null, "patchVersion": "1a9:01", "queryHash": "oneHash", + "retryErrorVersion": null, "ttl": 300000, }, ], @@ -6429,6 +6644,8 @@ describe('view-syncer/cvr', () => { }, "clientGroupID": "abc123", "deleted": null, + "errorMessage": null, + "errorVersion": null, "internal": null, "patchVersion": null, "queryArgs": null, @@ -6443,6 +6660,8 @@ describe('view-syncer/cvr', () => { }, "clientGroupID": "def456", "deleted": null, + "errorMessage": null, + "errorVersion": null, "internal": null, "patchVersion": null, "queryArgs": null, diff --git a/packages/zero-cache/src/services/view-syncer/cvr.ts b/packages/zero-cache/src/services/view-syncer/cvr.ts index fcfb74ea0f..99f98e2a34 100644 --- a/packages/zero-cache/src/services/view-syncer/cvr.ts +++ b/packages/zero-cache/src/services/view-syncer/cvr.ts @@ -32,6 +32,7 @@ import { cmpVersions, maxVersion, oneAfter, + versionFromString, type ClientQueryRecord, type ClientRecord, type CustomQueryRecord, @@ -292,6 +293,7 @@ export class CVRConfigDrivenUpdater extends CVRUpdater { name?: string | undefined; args?: readonly ReadonlyJSONValue[] | undefined; ttl?: number | undefined; + retryErrorVersion?: string | undefined; }>[], ): PatchToVersion[] { const patches: PatchToVersion[] = []; @@ -335,6 +337,15 @@ export class CVRConfigDrivenUpdater extends CVRUpdater { if (compareTTL(ttl, oldClientState.ttl) > 0) { // TTL update only - don't record for telemetry needed.add(hash); + continue; + } + + if ( + (q.retryErrorVersion + ? versionFromString(q.retryErrorVersion).stateVersion + : undefined) !== oldClientState.retryErrorVersion?.stateVersion + ) { + needed.add(hash); } } @@ -359,6 +370,9 @@ export class CVRConfigDrivenUpdater extends CVRUpdater { inactivatedAt, ttl, version: newVersion, + retryErrorVersion: q.retryErrorVersion + ? versionFromString(q.retryErrorVersion) + : undefined, }; this._cvr.queries[id] = query; patches.push({ @@ -374,6 +388,9 @@ export class CVRConfigDrivenUpdater extends CVRUpdater { false, inactivatedAt, ttl, + q.retryErrorVersion + ? versionFromString(q.retryErrorVersion) + : undefined, ); } return patches; @@ -560,13 +577,26 @@ export class CVRQueryDrivenUpdater extends CVRUpdater { */ trackQueries( lc: LogContext, - executed: {id: string; transformationHash: string}[], + executed: ( + | { + id: string; + transformationHash: string; + errorMessage?: string; + } + | { + id: string; + transformationHash?: undefined; + errorMessage: string; + } + )[], removed: {id: string; transformationHash: string | undefined}[], ): {newVersion: CVRVersion; queryPatches: PatchToVersion[]} { assert(this.#existingRows === undefined, `trackQueries already called`); const queryPatches: Patch[] = [ - executed.map(q => this.#trackExecuted(q.id, q.transformationHash)), + executed.map(q => + this.#trackExecuted(q.id, q.transformationHash, q.errorMessage), + ), removed.map(q => this.#trackRemoved(q.id)), ].flat(2); @@ -621,28 +651,59 @@ export class CVRQueryDrivenUpdater extends CVRUpdater { * * This must be called for all executed queries. */ - #trackExecuted(queryID: string, transformationHash: string): Patch[] { - assert(!this.#removedOrExecutedQueryIDs.has(queryID)); - this.#removedOrExecutedQueryIDs.add(queryID); + #trackExecuted( + id: string, + transformationHash: string | undefined, + errorMessage?: string | undefined, + ): Patch[] { + assert(!this.#removedOrExecutedQueryIDs.has(id)); + this.#removedOrExecutedQueryIDs.add(id); let gotQueryPatch: Patch | undefined; - const query = this._cvr.queries[queryID]; - if (query.transformationHash !== transformationHash) { + const queryToUpdate = this._cvr.queries[id]; + assert(queryToUpdate, `Query ${id} not found in CVR`); + + if ( + (transformationHash !== undefined && + queryToUpdate.transformationHash !== transformationHash) || + queryToUpdate.errorMessage !== (errorMessage ?? undefined) || + // If there is an error message, we always update the query to ensure + // the errorVersion is bumped, indicating a failed retry. + errorMessage !== undefined + ) { const transformationVersion = this._ensureNewVersion(); - if (query.type !== 'internal' && query.patchVersion === undefined) { + if (queryToUpdate.type === 'internal') { + if (transformationHash !== undefined) { + queryToUpdate.transformationHash = transformationHash; + } + queryToUpdate.transformationVersion = transformationVersion; + // Internal queries cannot be in an error state. + this._cvrStore.updateQuery(queryToUpdate); + return []; + } + + if (queryToUpdate.patchVersion === undefined) { // client query: desired -> gotten - query.patchVersion = transformationVersion; + queryToUpdate.patchVersion = transformationVersion; + gotQueryPatch = { + type: 'query', + op: 'put', + id, + }; + } else { + // client query: gotten -> gotten (changed) gotQueryPatch = { type: 'query', op: 'put', - id: query.id, + id, }; } - - query.transformationHash = transformationHash; - query.transformationVersion = transformationVersion; - this._cvrStore.updateQuery(query); + queryToUpdate.transformationHash = transformationHash; + queryToUpdate.transformationVersion = transformationVersion; + queryToUpdate.errorMessage = errorMessage; + queryToUpdate.errorVersion = errorMessage ? this._cvr.version : undefined; + this._cvrStore.updateQuery(queryToUpdate); } return gotQueryPatch ? [gotQueryPatch] : []; } diff --git a/packages/zero-cache/src/services/view-syncer/schema/cvr.ts b/packages/zero-cache/src/services/view-syncer/schema/cvr.ts index 32f73ccf6f..4435997a03 100644 --- a/packages/zero-cache/src/services/view-syncer/schema/cvr.ts +++ b/packages/zero-cache/src/services/view-syncer/schema/cvr.ts @@ -103,6 +103,8 @@ export type QueriesRow = { transformationVersion: string | null; internal: boolean | null; deleted: boolean | null; + errorMessage: string | null; + errorVersion: string | null; }; function createQueriesTable(shard: ShardID) { @@ -118,6 +120,8 @@ CREATE TABLE ${schema(shard)}.queries ( "transformationVersion" TEXT, "internal" BOOL, -- If true, no need to track / send patches "deleted" BOOL, -- put vs del "got" query + "errorMessage" TEXT, -- If present, the query execution failed + "errorVersion" TEXT, -- The CVR version at which the error occurred PRIMARY KEY ("clientGroupID", "queryHash"), @@ -149,6 +153,7 @@ export type DesiresRow = { deleted: boolean | null; ttl: number | null; inactivatedAt: TTLClock | null; + retryErrorVersion: string | null; }; function createDesiresTable(shard: ShardID) { @@ -163,6 +168,7 @@ CREATE TABLE ${schema(shard)}.desires ( "ttlMs" DOUBLE PRECISION, -- Time to live in milliseconds "inactivatedAt" TIMESTAMPTZ, -- DEPRECATED: Use inactivatedAtMs instead. Time at which this row was inactivated "inactivatedAtMs" DOUBLE PRECISION, -- Time at which this row was inactivated (milliseconds since client group start) + "retryErrorVersion" TEXT, -- The CVR version after which the query should be retried PRIMARY KEY ("clientGroupID", "clientID", "queryHash"), diff --git a/packages/zero-cache/src/services/view-syncer/schema/init.ts b/packages/zero-cache/src/services/view-syncer/schema/init.ts index 93927e40d8..23fbf527ef 100644 --- a/packages/zero-cache/src/services/view-syncer/schema/init.ts +++ b/packages/zero-cache/src/services/view-syncer/schema/init.ts @@ -182,6 +182,17 @@ export async function initViewSyncerSchema( }, }; + const migrateV15ToV16: Migration = { + migrateSchema: async (_, sql) => { + await sql`ALTER TABLE ${sql(schema)}.queries + ADD COLUMN "errorMessage" TEXT`; + await sql`ALTER TABLE ${sql(schema)}.queries + ADD COLUMN "errorVersion" TEXT`; + await sql`ALTER TABLE ${sql(schema)}.desires + ADD COLUMN "retryErrorVersion" TEXT`; + }, + }; + const schemaVersionMigrationMap: IncrementalMigrationMap = { 2: migrateV1toV2, 3: migrateV2ToV3, @@ -211,6 +222,8 @@ export async function initViewSyncerSchema( // directly as DOUBLE PRECISION, avoiding postgres.js TIMESTAMPTZ // type conversion issues 15: migratedV14ToV15, + // V16 adds queries."errorMessage", queries."errorVersion", and desires."retryErrorVersion" + 16: migrateV15ToV16, }; await runSchemaMigrations( diff --git a/packages/zero-cache/src/services/view-syncer/schema/types.ts b/packages/zero-cache/src/services/view-syncer/schema/types.ts index d739498b25..7b9d77887a 100644 --- a/packages/zero-cache/src/services/view-syncer/schema/types.ts +++ b/packages/zero-cache/src/services/view-syncer/schema/types.ts @@ -152,6 +152,16 @@ export const baseQueryRecordSchema = v.object({ * queries with a newer `transformationVersion`. */ transformationVersion: cvrVersionSchema.optional(), + + /** + * If present, the query execution failed. + */ + errorMessage: v.string().optional(), + + /** + * The CVR version at which the error occurred. + */ + errorVersion: cvrVersionSchema.optional(), }); /** @@ -190,6 +200,11 @@ const clientStateSchema = v.object({ * The version at which the client state changed (i.e. individual `patchVersion`s). */ version: cvrVersionSchema, + + /** + * The error version for which the query should be retried. + */ + retryErrorVersion: cvrVersionSchema.optional(), }); const externalQueryRecordSchema = baseQueryRecordSchema.extend({ @@ -336,6 +351,8 @@ export function queryRecordToQueryRow( transformationVersion: maybeVersionString(query.transformationVersion), internal: true, deleted: false, // put vs del "got" query + errorMessage: query.errorMessage ?? null, + errorVersion: maybeVersionString(query.errorVersion), }; case 'client': return { @@ -349,6 +366,8 @@ export function queryRecordToQueryRow( transformationVersion: maybeVersionString(query.transformationVersion), internal: null, deleted: false, // put vs del "got" query + errorMessage: query.errorMessage ?? null, + errorVersion: maybeVersionString(query.errorVersion), }; case 'custom': return { @@ -362,6 +381,8 @@ export function queryRecordToQueryRow( transformationVersion: maybeVersionString(query.transformationVersion), internal: null, deleted: false, // put vs del "got" query + errorMessage: query.errorMessage ?? null, + errorVersion: maybeVersionString(query.errorVersion), }; } } diff --git a/packages/zero-cache/src/services/view-syncer/view-syncer.ts b/packages/zero-cache/src/services/view-syncer/view-syncer.ts index d703abb8d0..f8fe2fca85 100644 --- a/packages/zero-cache/src/services/view-syncer/view-syncer.ts +++ b/packages/zero-cache/src/services/view-syncer/view-syncer.ts @@ -1430,32 +1430,81 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService { } } - const serverQueries = transformedQueries.map( - ({id, origQuery, transformed}) => { - const ids = hashToIDs.get(transformed.transformationHash); - if (ids) { - ids.push(id); - } else { - hashToIDs.set(transformed.transformationHash, [id]); + const serverQueries: { + id: string; + ast?: AST; + transformationHash: string; + remove: boolean; + errorMessage?: string; + retry?: boolean; + }[] = transformedQueries.map(({id, origQuery, transformed}) => { + const ids = hashToIDs.get(transformed.transformationHash); + if (ids) { + ids.push(id); + } else { + hashToIDs.set(transformed.transformationHash, [id]); + } + let retry = false; + if (origQuery.type !== 'internal' && origQuery.clientState) { + for (const state of Object.values(origQuery.clientState)) { + if ( + origQuery.errorMessage !== undefined && + origQuery.errorVersion !== undefined && + state.retryErrorVersion !== undefined && + origQuery.errorVersion.stateVersion === + state.retryErrorVersion.stateVersion + ) { + retry = true; + break; + } } - return { - id, - ast: transformed.transformedAst, - transformationHash: transformed.transformationHash, - remove: expired(ttlClock, origQuery), - }; - }, + } + + return { + id, + ast: transformed.transformedAst, + transformationHash: transformed.transformationHash, + remove: expired(ttlClock, origQuery), + retry, + }; + }); + + const retryHashes = new Set( + serverQueries.filter(q => q.retry).map(q => q.transformationHash), ); - const addQueries = serverQueries.filter( - q => !q.remove && !hydratedQueries.has(q.transformationHash), + const addQueries: ( + | { + id: string; + ast?: AST; + transformationHash: string; + errorMessage?: string; + remove?: boolean; + retry?: boolean; + } + | { + id: string; + ast?: AST; + transformationHash?: undefined; + errorMessage: string; + remove?: boolean; + retry?: boolean; + } + )[] = serverQueries.filter( + q => + !q.remove && + (!hydratedQueries.has(q.transformationHash) || + retryHashes.has(q.transformationHash)), ); const removeQueries: { id: string; transformationHash: string | undefined; }[] = serverQueries.filter(q => q.remove); const desiredQueries = new Set( - serverQueries.filter(q => !q.remove).map(q => q.transformationHash), + serverQueries + .filter(q => !q.remove) + .map(q => q.transformationHash) + .filter(h => !retryHashes.has(h)), ); const unhydrateQueries = [...hydratedQueries].filter( transformationHash => !desiredQueries.has(transformationHash), @@ -1474,32 +1523,12 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService { // These are queries we need to remove from `desired`, not `got`, because they never transformed. if (erroredQueryIDs) { // Build a set of transformation hashes that succeeded - const successfulHashes = new Set( - transformedQueries.map( - ({transformed}) => transformed.transformationHash, - ), - ); - for (const queryID of erroredQueryIDs) { - // Try to get the last known transformation hash for this query - let lastKnownHash: string | undefined; - - // Check if the query exists in the CVR with a transformation hash - const cvrQuery = cvr.queries[queryID]; - if (cvrQuery?.transformationHash) { - lastKnownHash = cvrQuery.transformationHash; - } - - // If a successfully transformed query has the same hash, we can't remove it - // because that would remove the pipeline for the successful query - const transformationHash = - lastKnownHash && successfulHashes.has(lastKnownHash) - ? undefined - : lastKnownHash; - - removeQueries.push({ + addQueries.push({ id: queryID, - transformationHash, + transformationHash: undefined, + errorMessage: 'Transformation failed', + remove: false, }); } } @@ -1560,7 +1589,20 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService { #addAndRemoveQueries( lc: LogContext, cvr: CVRSnapshot, - addQueries: {id: string; ast: AST; transformationHash: string}[], + addQueries: ( + | { + id: string; + ast?: AST; + transformationHash: string; + errorMessage?: string; + } + | { + id: string; + ast?: AST; + transformationHash?: undefined; + errorMessage: string; + } + )[], removeQueries: {id: string; transformationHash: string | undefined}[], unhydrateQueries: string[], hashToIDs: Map, @@ -1647,17 +1689,21 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService { .withContext('transformationHash', q.transformationHash); lc.debug?.(`adding pipeline for query`, q.ast); + if (q.errorMessage) { + continue; + } + yield* pipelines.addQuery( - q.transformationHash, + q.transformationHash as string, q.id, - q.ast, + q.ast as AST, timer.startWithoutYielding(), ); const elapsed = timer.stop(); totalProcessTime += elapsed; self.#addQueryMaterializationServerMetric( - q.transformationHash, + q.transformationHash as string, elapsed, ); @@ -1665,8 +1711,9 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService { lc.warn?.('Slow query materialization', elapsed, q.ast); } manualSpan(tracer, 'vs.addAndConsumeQuery', elapsed, { - hash: q.id, + id: q.id, transformationHash: q.transformationHash, + errorMessage: q.errorMessage, }); } hydrations.add(1); diff --git a/packages/zero-protocol/src/protocol-version.test.ts b/packages/zero-protocol/src/protocol-version.test.ts index 80d1e56380..bad934753d 100644 --- a/packages/zero-protocol/src/protocol-version.test.ts +++ b/packages/zero-protocol/src/protocol-version.test.ts @@ -11,6 +11,6 @@ test('protocol version', () => { // If this test fails upstream or downstream schema has changed such that // old code will not understand the new schema, bump the // PROTOCOL_VERSION and update the expected values. - expect(hash).toEqual('spnlrrnjt59g'); + expect(hash).toEqual('hw65ubj5715g'); expect(PROTOCOL_VERSION).toBe(42); }); diff --git a/packages/zero-protocol/src/queries-patch.ts b/packages/zero-protocol/src/queries-patch.ts index 3a9938093f..2b4c896242 100644 --- a/packages/zero-protocol/src/queries-patch.ts +++ b/packages/zero-protocol/src/queries-patch.ts @@ -2,19 +2,32 @@ import {jsonSchema} from '../../shared/src/json-schema.ts'; import * as v from '../../shared/src/valita.ts'; import {astSchema} from './ast.ts'; -export const putOpSchema = v.object({ +export const putOpSchemaBase = v.object({ op: v.literal('put'), hash: v.string(), ttl: v.number().optional(), }); -export const upPutOpSchema = putOpSchema.extend({ +export const putOpSchema = putOpSchemaBase.extend({ + // If set, the query is in an error state, and errorVersion is set to the + // version at which the error occurred. + errorMessage: v.string().optional(), + // An opaque orderable version string representing the version at which + // the error occurred. + errorVersion: v.string().optional(), +}); + +export const upPutOpSchema = putOpSchemaBase.extend({ // All fields are optional in this transitional period. // - ast is filled in for client queries // - name and args are filled in for custom queries ast: astSchema.optional(), name: v.string().optional(), args: v.readonly(v.array(jsonSchema)).optional(), + // If set, the client requests a retry for this query. The retry will only be + // attempted if the current error version for the query is equal to this + // value. + retryErrorVersion: v.string().optional(), }); const delOpSchema = v.object({