Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
build-and-test:
Expand Down
11 changes: 11 additions & 0 deletions abis/RecurringCollector.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,16 @@
],
"name": "RCACollected",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{ "indexed": true, "name": "agreementId", "type": "bytes16" },
{ "indexed": true, "name": "payer", "type": "address" },
{ "indexed": true, "name": "offerType", "type": "uint8" },
{ "indexed": false, "name": "offerHash", "type": "bytes32" }
],
"name": "OfferStored",
"type": "event"
}
]
20 changes: 20 additions & 0 deletions schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,23 @@ type IndexerDeploymentLatest @entity(immutable: false) {
blockNumber: BigInt!
blockTimestamp: BigInt!
}

# First stored offer per agreementId, keyed by bytes16 agreement ID.
# Dipper queries this entity as an idempotency gate -- avoids re-submitting
# an offer after a crashed-mid-flight restart where the on-chain tx landed
# but dipper lost track of it.
#
# Declared immutable because, for a given agreementId, the RCA identifying
# fields (payer, dataService, serviceProvider, deadline, nonce) are fixed by
# the id derivation, so any duplicate OfferStored event for the same id would
# carry the same offerHash. The handler enforces this by returning early on
# the second event instead of attempting to overwrite.
type Offer @entity(immutable: true) {
id: Bytes!
payer: Bytes!
offerType: Int!
offerHash: Bytes!
createdAtBlock: BigInt!
createdAtTimestamp: BigInt!
createdAtTx: Bytes!
}
24 changes: 23 additions & 1 deletion src/recurringCollector.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { IndexingAgreement } from '../generated/schema'
import { IndexingAgreement, Offer } from '../generated/schema'
import {
AgreementAccepted,
AgreementCanceled,
AgreementUpdated,
RCACollected,
OfferStored as OfferStoredEvent,
} from '../generated/RecurringCollector/RecurringCollector'
import { createOrLoadIndexingAgreement, BIGINT_ZERO } from './helpers'

Expand Down Expand Up @@ -61,3 +62,24 @@ export function handleRCACollected(event: RCACollected): void {
agreement.tokensCollected = agreement.tokensCollected.plus(event.params.tokens)
agreement.save()
}

export function handleOfferStored(event: OfferStoredEvent): void {
// First-offer entity keyed by agreementId (bytes16). Immutable: if an
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is exactly as described here. We are using the agreementId as unique key but OfferStored could be emitted multiple times for the same agreement id. If an offer is created and updated we would be skipping the update event:

if ($.rcauOffers[rcau.agreementId].offerHash != rcauHash) {
    $.rcauOffers[rcau.agreementId] = StoredOffer({ offerHash: rcauHash, data: abi.encode(rcau) });
    emit OfferStored(rcau.agreementId, agreement.payer, OFFER_TYPE_UPDATE, rcauHash);
}

To keep immutable entities we would have to use the offerHash as an id but then from the dipper we would need to find the latest valid offer. I would suggest we drop immutability and overwrite the Offer with the latest terms, that way from dipper we query by agreement id and use the latest values.

// entity already exists, a duplicate OfferStored event for the same
// agreement id (e.g. dipper crashed and re-submitted, or a chain reorg
// re-emitted) carries the same offerHash by construction and we return
// early. Writing to an immutable entity a second time is a graph-node
// error that would halt the subgraph, so the guard is load-bearing.
let existing = Offer.load(event.params.agreementId)
if (existing != null) {
return
}
let offer = new Offer(event.params.agreementId)
offer.payer = event.params.payer
offer.offerType = event.params.offerType
offer.offerHash = event.params.offerHash
offer.createdAtBlock = event.block.number
offer.createdAtTimestamp = event.block.timestamp
offer.createdAtTx = event.transaction.hash
offer.save()
}
3 changes: 3 additions & 0 deletions subgraph.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dataSources:
language: wasm/assemblyscript
entities:
- IndexingAgreement
- Offer
abis:
- name: RecurringCollector
file: ./abis/RecurringCollector.json
Expand All @@ -59,4 +60,6 @@ dataSources:
- event: RCACollected(indexed address,indexed address,indexed address,bytes16,bytes32,uint256,uint256)
handler: handleRCACollected
topic1: ["{{subgraphServiceAddress}}"]
- event: OfferStored(indexed bytes16,indexed address,indexed uint8,bytes32)
handler: handleOfferStored
file: ./src/recurringCollector.ts
67 changes: 67 additions & 0 deletions tests/recurringCollector.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { assert, describe, test, clearStore, afterEach, newMockEvent } from 'matchstick-as'
import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts'
import { handleOfferStored } from '../src/recurringCollector'
import { OfferStored as OfferStoredEvent } from '../generated/RecurringCollector/RecurringCollector'

const PAYER = Address.fromString('0x0000000000000000000000000000000000000002')
const AGREEMENT_ID = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10')

function createOfferStoredEvent(
agreementId: Bytes,
offerType: i32,
offerHash: Bytes,
): OfferStoredEvent {
let event = changetype<OfferStoredEvent>(newMockEvent())

event.parameters = new Array()
event.parameters.push(
new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)),
)
event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER)))
event.parameters.push(
new ethereum.EventParam(
'offerType',
ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(offerType)),
),
)
event.parameters.push(
new ethereum.EventParam('offerHash', ethereum.Value.fromFixedBytes(offerHash)),
)

return event
}

describe('handleOfferStored', () => {
afterEach(() => {
clearStore()
})

test('first event creates Offer entity', () => {
let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32))
let event = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash)
handleOfferStored(event)

assert.entityCount('Offer', 1)

let id = AGREEMENT_ID.toHexString()
assert.fieldEquals('Offer', id, 'payer', PAYER.toHexString())
assert.fieldEquals('Offer', id, 'offerType', '0')
assert.fieldEquals('Offer', id, 'offerHash', offerHash.toHexString())
})

test('duplicate event for same agreementId is a no-op (idempotency guard)', () => {
let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32))

let event1 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash)
handleOfferStored(event1)

// Second event for same agreementId must not halt on immutable re-write.
let event2 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash)
event2.transaction.hash = Bytes.fromHexString(
'0x1111111111111111111111111111111111111111111111111111111111111111',
) as Bytes
handleOfferStored(event2)

assert.entityCount('Offer', 1)
})
})
Loading