Skip to content

Provide FlatMappingProcessor as an alternative to UnboundedProcessor #752

@OlegDokuka

Description

@OlegDokuka

The Problem

Right now in RSocket we use UnboundedProcessor which simply is an MultiProducerSingleConsumerArrayLinkedQueue.

It means that all frames from all possible interactions are enqueued into that queue with NO backpressure control and then underlying connection drains elements with the speed it can:
image

Such a design may lead to many problems including memory leaks and general application instability

Proposal

In order to have fine-grained control on all the levels, I propose to provide a customizable processor and introduce FlatMappingProcessor instead.

In a few words, FlatMappingProcessor should behave identically to FlatMap operator in ProjectReactor / RxJava

image

Main Characteristic

  1. Prefetch Mode for Flux
  2. Concurrency Control (RSocket user can specify the number of active streams)
  3. Small SpScBoundedArrayQueue for every Flux (requestChannel / requestStream cases) and general scalar MpScUnboundedLinkedArrayQueue for Mono cases

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions