Skip to content

fix: publish empty batches to avoid staled rounds #692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions publish/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,35 @@ export const publish = async ({
maxMeasurements
])
if (measurements.length === 0) {
logger.log('No measurements to publish.')
return
// IMPORTANT: We still need to publish an empty batch,
// otherwise the current round will never advance.
logger.log('WARNING: No measurements to publish. Publishing an empty batch.')
}

// Fetch the count of all unpublished measurements - we need this for monitoring
// Note: this number will be higher than `measurements.length` because spark-api adds more
// measurements in between the previous and the next query.
const totalCount = (await pgPool.query(
'SELECT COUNT(*) FROM measurements'
)).rows[0].count
)).rows[0]?.count || 0

logger.log(`Publishing ${measurements.length} measurements. Total unpublished: ${totalCount}. Batch size: ${maxMeasurements}.`)

// Share measurements
const start = new Date()
const file = new File(
[measurements.map(m => JSON.stringify(m)).join('\n')],
'measurements.ndjson',
{ type: 'application/json' }
)
const cid = await web3Storage.uploadFile(file)

let cid
if (measurements.length) {
const file = new File(
[measurements.map(m => JSON.stringify(m)).join('\n')],
'measurements.ndjson',
{ type: 'application/json' }
)
cid = await web3Storage.uploadFile(file)
} else {
// bafkqaaa is a CID for an empty file
cid = 'bafkqaaa'
}
const uploadMeasurementsDuration = new Date().getTime() - start.getTime()
logger.log(`Measurements packaged in ${cid}`)

Expand Down
41 changes: 36 additions & 5 deletions publish/test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ describe('unit', () => {
assert.strictEqual(removeConfirmedCalls.length, 1)
})

it('noops when measurements empty', async () => {
it('publishes an empty batch with CID bafkqaaa when measurements empty', async () => {
const clientStatements = []
const client = {
connect () {
Expand All @@ -122,9 +122,40 @@ describe('unit', () => {
}
}

const web3Storage = {}
const ieContract = {}
const stuckTransactionsCanceller = {}
const web3Storage = {
async uploadFile (file) {
throw new Error('web3Storage.uploadFile should not be called')
}
}

const ieContractMeasurementCIDs = []
const ieContract = {
async addMeasurements (cid) {
ieContractMeasurementCIDs.push(cid)
return {
async wait () {
return {
logs: [{}]
}
}
}
},
interface: {
parseLog () {
return {
args: [
null,
1
]
}
}
}
}

const stuckTransactionsCanceller = {
async addPending (tx) { },
async removeConfirmed (tx) { }
}

const { recordTelemetry } = createTelemetryRecorderStub()

Expand All @@ -138,7 +169,7 @@ describe('unit', () => {
stuckTransactionsCanceller
})

assert.strictEqual(clientStatements.length, 1)
assert.deepStrictEqual(ieContractMeasurementCIDs, ['bafkqaaa'])
})
})

Expand Down