From bbb6b807fc1fbbecf4a162f78ec7f6a33730a191 Mon Sep 17 00:00:00 2001 From: Ben Dyer <43922444+ben-dyer@users.noreply.github.com> Date: Sun, 15 Aug 2021 19:00:09 +0900 Subject: [PATCH 1/2] copy paste get_more from enumerable protocol to ChangeStream module --- lib/mongo/change_stream.ex | 55 +++++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/lib/mongo/change_stream.ex b/lib/mongo/change_stream.ex index 1f9abc8b..c6af0ddc 100644 --- a/lib/mongo/change_stream.ex +++ b/lib/mongo/change_stream.ex @@ -31,6 +31,59 @@ defmodule Mongo.ChangeStream do end end + @doc """ + Calls the GetCore-Command + See https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands.rst + """ + def get_more(topology_pid, session, coll, cursor_id, + change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd, + on_resume_token: fun) = change_stream, opts) do + + get_more = [ + getMore: %BSON.LongNumber{value: cursor_id}, + collection: coll, + batchSize: opts[:batch_size], + maxTimeMS: opts[:max_time] + ] |> filter_nils() + + with {:ok, %{"operationTime" => op_time, + "cursor" => %{"id" => new_cursor_id, "nextBatch" => docs} = cursor, + "ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, get_more, opts) do + + old_token = change_stream(change_stream, :resume_token) + change_stream = update_change_stream(change_stream, cursor["postBatchResumeToken"], op_time, List.last(docs)) + new_token = change_stream(change_stream, :resume_token) + + case token_changes(old_token, new_token) do + true -> fun.(new_token) + false -> nil + end + + {:ok, %{cursor_id: new_cursor_id, docs: docs, change_stream: change_stream}} + + else + {:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable} + {:error, _error} -> + + with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do + + [%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options + + stream_opts = update_stream_options(stream_opts, resume_token, op_time, wire_version) + aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end) + + # kill the cursor + kill_cursors(session, coll, [cursor_id], opts) + + # Start aggregation again... + with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do + {:resume, state} + end + end + + end + end + defimpl Enumerable do defrecordp :change_stream, [:resume_token, :op_time, :cmd, :on_resume_token] @@ -291,4 +344,4 @@ defmodule Mongo.ChangeStream do def member?(_stream, _term), do: {:error, __MODULE__} end -end \ No newline at end of file +end From 3fa9f9ca89f71da01627f19f22f4eb31855ec41e Mon Sep 17 00:00:00 2001 From: Ben Dyer <43922444+ben-dyer@users.noreply.github.com> Date: Sun, 15 Aug 2021 19:11:53 +0900 Subject: [PATCH 2/2] Adapt try next --- lib/mongo/change_stream.ex | 74 +++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/lib/mongo/change_stream.ex b/lib/mongo/change_stream.ex index c6af0ddc..49891d4b 100644 --- a/lib/mongo/change_stream.ex +++ b/lib/mongo/change_stream.ex @@ -35,52 +35,62 @@ defmodule Mongo.ChangeStream do Calls the GetCore-Command See https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands.rst """ - def get_more(topology_pid, session, coll, cursor_id, - change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd, - on_resume_token: fun) = change_stream, opts) do + def try_next(%__MODULE__{ + cmd: aggregate_cmd, + doc: doc, + session: session, + topology_pid: _topology_pid, + on_resume_token: on_resume_token + } = change_stream, opts \\ []) do + %{ + "cursor" => %{ + "id" => cursor_id, + "ns" => ns, + "firstBatch" => docs, + "postBatchResumeToken" => old_token + }, + "operationTime" => op_time + } = doc + + [_db, coll] = ns |> String.split(".") + + batch_size = opts[:batch_size] || aggregate_cmd[:cursor][:batchSize] get_more = [ - getMore: %BSON.LongNumber{value: cursor_id}, - collection: coll, - batchSize: opts[:batch_size], - maxTimeMS: opts[:max_time] - ] |> filter_nils() + getMore: %BSON.LongNumber{value: cursor_id}, + collection: coll, + batchSize: batch_size, + maxTimeMS: opts[:max_time] + ] |> Enum.reject(fn {_key, value} -> is_nil(value) end) with {:ok, %{"operationTime" => op_time, "cursor" => %{"id" => new_cursor_id, "nextBatch" => docs} = cursor, "ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, get_more, opts) do - old_token = change_stream(change_stream, :resume_token) - change_stream = update_change_stream(change_stream, cursor["postBatchResumeToken"], op_time, List.last(docs)) - new_token = change_stream(change_stream, :resume_token) + # TODO: old_token also fallback to startAfter, etc + new_token = cursor["postBatchResumeToken"] || List.last(docs)["_id"] - case token_changes(old_token, new_token) do - true -> fun.(new_token) - false -> nil + case {old_token, new_token} do + {nil, nil} -> nil + {%{} = map, map} -> nil + _ -> on_resume_token.(new_token) end - {:ok, %{cursor_id: new_cursor_id, docs: docs, change_stream: change_stream}} + # TODO: Do we need to update the operationTime? + change_stream = Map.update!(change_stream, :doc, fn old_doc -> + old_doc + |> Map.put("operationTime", op_time) + |> Map.update!("cursor", & Map.put(&1, "id", new_cursor_id)) + end) + + {docs, change_stream} else {:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable} - {:error, _error} -> - - with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do - - [%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options - - stream_opts = update_stream_options(stream_opts, resume_token, op_time, wire_version) - aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end) - - # kill the cursor - kill_cursors(session, coll, [cursor_id], opts) - - # Start aggregation again... - with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do - {:resume, state} - end - end + {:error, error} -> + # TODO: handle error + raise error end end