Skip to content

Recreate producer aggregator instead of flushing it #114

@crepererum

Description

@crepererum

In #113 @tustvold fixed some panic and cancelation safety issues in our producer pipeline. One issue however remains: what is supposed to happen when Aggregator::flush panics? At the moment we assume that we can reuse the aggregator afterwards and that the aggregator is kinda panic safe. However it would be nicer if we wouldn't need to make this assumption and instead re-create the aggregator instead of modifying it. This would also result in a somewhat clearer interface, since currently most aggregator implementations use a static config part and a "state" part that is reset on flush, see

#[derive(Debug)]
pub struct RecordAggregator {
max_batch_size: usize,
state: AggregatorState,
}

and

https://github.com/influxdata/influxdb_iox/blob/3f547c58c9b986c82a71d21ff4976e0c1b9c9a90/write_buffer/src/kafka/aggregator.rs#L236-L256

Using some constructor/factory (fn create(&self) -> Aggregator) + a throw-away aggregator (fn flush(self)) would result in a somewhat clearer interface and would likely also make the code in rskafka easier to understand.

Note that w/ user-provided code there is always some form of panic-safety involved. In the proposed solution the "aggregator factory" could panic when creating new aggregators, but I still think this would be easier to understand then a &mut self function that panics.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions