diff --git a/lib/mongo/change_stream.ex b/lib/mongo/change_stream.ex index 1f9abc8b..49891d4b 100644 --- a/lib/mongo/change_stream.ex +++ b/lib/mongo/change_stream.ex @@ -31,6 +31,69 @@ defmodule Mongo.ChangeStream do end end + @doc """ + Calls the GetCore-Command + See https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands.rst + """ + def try_next(%__MODULE__{ + cmd: aggregate_cmd, + doc: doc, + session: session, + topology_pid: _topology_pid, + on_resume_token: on_resume_token + } = change_stream, opts \\ []) do + %{ + "cursor" => %{ + "id" => cursor_id, + "ns" => ns, + "firstBatch" => docs, + "postBatchResumeToken" => old_token + }, + "operationTime" => op_time + } = doc + + [_db, coll] = ns |> String.split(".") + + batch_size = opts[:batch_size] || aggregate_cmd[:cursor][:batchSize] + + get_more = [ + getMore: %BSON.LongNumber{value: cursor_id}, + collection: coll, + batchSize: batch_size, + maxTimeMS: opts[:max_time] + ] |> Enum.reject(fn {_key, value} -> is_nil(value) end) + + with {:ok, %{"operationTime" => op_time, + "cursor" => %{"id" => new_cursor_id, "nextBatch" => docs} = cursor, + "ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, get_more, opts) do + + # TODO: old_token also fallback to startAfter, etc + new_token = cursor["postBatchResumeToken"] || List.last(docs)["_id"] + + case {old_token, new_token} do + {nil, nil} -> nil + {%{} = map, map} -> nil + _ -> on_resume_token.(new_token) + end + + # TODO: Do we need to update the operationTime? + change_stream = Map.update!(change_stream, :doc, fn old_doc -> + old_doc + |> Map.put("operationTime", op_time) + |> Map.update!("cursor", & Map.put(&1, "id", new_cursor_id)) + end) + + {docs, change_stream} + + else + {:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable} + {:error, error} -> + + # TODO: handle error + raise error + end + end + defimpl Enumerable do defrecordp :change_stream, [:resume_token, :op_time, :cmd, :on_resume_token] @@ -291,4 +354,4 @@ defmodule Mongo.ChangeStream do def member?(_stream, _term), do: {:error, __MODULE__} end -end \ No newline at end of file +end