Skip to content

[Structured Steaming] Limit the number of processed items per trigger (batch) #178

Open
@fe2s

Description

@fe2s

The request came up from https://stackoverflow.com/questions/56679474/how-to-set-maximum-batch-size-of-spark-readstream-in-structured-streaming

I am reading batch record from redis using spark-structured-streaming foreachBatch by following code (trying to set the batchSize by stream.read.batch.size)

val data = spark.readStream.format("redis")
.option("stream.read.batch.size").load()

val query = data.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => ...
// we count size of batchDF here, we want to limit its size
// some operation
}

currently we set stream.read.batch.size to 128 but seems this does not work. The batchSize seems to be random, sometimes over 1000 even 10000.

However I do not want to wait for so long (10000 records) because I have some operations (in code comment // some operation) need to be done as soon as possible, so that I want to control the maximum batch size so when records reach this limitation it could be processed immediately, how to do it?

The stream.read.batch.size parameter controls the number of items read by a single Redis API call (count parameter of XREADGROUP call). It doesn't affect the number of items per trigger (batchDF size).

If possible, we should introduce a new parameter that will limit the number of items per trigger similar to maxOffsetsPerTrigger in Kafka integration.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions