Skip to content

feat: driver high-water marksย #13

Open
@merklefruit

Description

@merklefruit

The current req and pub driver have infinite capacity. This means that if for some reason messages accumulate in the pending requests queue, the application will eventually run out of memory and crash.

  • example in req driver:

// Check for outgoing messages from the socket handle
match this.from_socket.poll_recv(cx) {
Poll::Ready(Some(Command::Send { message, response })) => {
// Queue the message for sending
let start = std::time::Instant::now();
let msg = this.new_message(message);
let id = msg.id();
this.egress_queue.push_back(msg);
this.pending_requests.insert(
id,
PendingRequest {
start,
sender: response,
},

We should add some sort of HWM that either blocks or drops messages after a specific capacity is reached.

An option could be to use LruCache instead of the current FxHashMap.
Further improvements include different StorageBackends including layered cache between memory and disk (e.g. LruCache -> RocksDB)


Edit:

  • See conversation in Pubsub v1ย #9 (comment)
  • On the subscriber level, a durability Policy settings should be added to indicate if we intend to receive cached messages or just the ones send from the connection point onward.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions