diff --git a/lib/kinesis_client/stream.ex b/lib/kinesis_client/stream.ex index 17618af..2566ec4 100644 --- a/lib/kinesis_client/stream.ex +++ b/lib/kinesis_client/stream.ex @@ -71,7 +71,7 @@ defmodule KinesisClient.Stream do ] Logger.debug( - "Starting KinesisClient.Stream: [app_name: #{app_name}, stream_name: {stream_name}]" + "Starting KinesisClient.Stream: [app_name: #{app_name}, stream_name: #{stream_name}]" ) Supervisor.init(children, strategy: :one_for_all) diff --git a/lib/kinesis_client/stream/app_state.ex b/lib/kinesis_client/stream/app_state.ex index 44cfea4..118bc98 100644 --- a/lib/kinesis_client/stream/app_state.ex +++ b/lib/kinesis_client/stream/app_state.ex @@ -11,8 +11,13 @@ defmodule KinesisClient.Stream.AppState do Get a `KinesisClient.Stream.AppState.ShardInfo` struct by shard_id. If there is not an existing record, returns `:not_found`. """ - def get_lease(app_name, stream_name, shard_id, opts \\ []), - do: adapter(opts).get_lease(app_name, stream_name, shard_id, opts) + def get_lease(app_name, stream_name, shard_id, opts \\ []) do + IO.inspect("app get_lease called") + IO.inspect("app_name: #{app_name}, stream_name: #{stream_name}, shard_id: #{shard_id}") + IO.puts("opts: #{inspect(opts)}") + IO.puts("adapter: #{inspect(adapter(opts))}") + adapter(opts).get_lease(app_name, stream_name, shard_id, opts) + end @doc """ Persists a new ShardInfo record. Returns an error if there is already a record for that `shard_id` diff --git a/lib/kinesis_client/stream/app_state/adapter.ex b/lib/kinesis_client/stream/app_state/adapter.ex index bf71c7c..e8d12fd 100644 --- a/lib/kinesis_client/stream/app_state/adapter.ex +++ b/lib/kinesis_client/stream/app_state/adapter.ex @@ -63,4 +63,10 @@ defmodule KinesisClient.Stream.AppState.Adapter do opts :: keyword ) :: :ok | {:error, any} + + @callback delete_all_leases_and_restart_workers( + supervisor :: Supervisor.t(), + app_name :: String.t(), + opts :: keyword + ) :: {:ok, any} | {:error, any} end diff --git a/lib/kinesis_client/stream/app_state/dynamo.ex b/lib/kinesis_client/stream/app_state/dynamo.ex index bca698f..b929770 100644 --- a/lib/kinesis_client/stream/app_state/dynamo.ex +++ b/lib/kinesis_client/stream/app_state/dynamo.ex @@ -16,6 +16,35 @@ defmodule KinesisClient.Stream.AppState.Dynamo do end end + @impl true + def delete_all_leases_and_restart_workers(supervisor, app_name, _opts) do + case app_name + |> Dynamo.scan() + |> ExAws.request() do + {:ok, items} -> + Enum.each(items["Items"], fn lease -> + cond do + lease["shard_id"] -> + app_name + |> Dynamo.delete_item(%{"shard_id" => lease["shard_id"]["S"]}) + |> ExAws.request() + end + end) + + case Process.whereis(supervisor) do + nil -> + {:error, "Supervisor not running"} + + pid -> + Process.exit(pid, :shutdown) + {:ok, "Shard leases deleted and workers restarted"} + end + + {:error, error} -> + {:error, error} + end + end + defp create_table(app_name) do app_name |> send_create_table_request() diff --git a/lib/kinesis_client/stream/app_state/ecto.ex b/lib/kinesis_client/stream/app_state/ecto.ex index 1c03d8a..3709cc2 100644 --- a/lib/kinesis_client/stream/app_state/ecto.ex +++ b/lib/kinesis_client/stream/app_state/ecto.ex @@ -8,6 +8,8 @@ defmodule KinesisClient.Stream.AppState.Ecto do alias KinesisClient.Stream.AppState.Ecto.ShardLease alias KinesisClient.Stream.AppState.Ecto.ShardLeases + require Logger + @migrations [ {CreateShardLeaseTable.version(), CreateShardLeaseTable}, {AddAppAndStreamNameColumns.version(), AddAppAndStreamNameColumns} @@ -21,6 +23,28 @@ defmodule KinesisClient.Stream.AppState.Ecto do end end + @impl true + def delete_all_leases_and_restart_workers(supervisor, __app_name, opts) do + repo = Keyword.get(opts, :repo) + + with supervisor when not is_nil(supervisor) <- Process.whereis(supervisor), + :ok <- repo.delete_all(ShardLease) do + Logger.info("Shard leases deleted") + Process.exit(supervisor, :shutdown) + Logger.info("Restarting workers") + + {:ok, "Shard leases deleted and workers restarted"} + else + nil -> + Logger.info("Supervisor not running") + {:error, "Supervisor not running"} + + _ -> + Logger.info("Failed to delete shard leases, reason unknown") + {:error, "Failed to delete shard leases, reason unknown"} + end + end + @impl true def create_lease(app_name, stream_name, shard_id, lease_owner, opts) do repo = Keyword.get(opts, :repo) @@ -46,8 +70,14 @@ defmodule KinesisClient.Stream.AppState.Ecto do @impl true def get_lease(app_name, stream_name, shard_id, opts) do + IO.inspect("ecto get_lease called") repo = Keyword.get(opts, :repo) + IO.puts("repo: #{inspect(repo)}") shard_lease_params = %{shard_id: shard_id, app_name: app_name, stream_name: stream_name} + IO.puts("get_lease: params: #{inspect(shard_lease_params)}") + + IO.puts("get_lease: get_shard_lease result") + IO.puts(inspect(ShardLeases.get_shard_lease(shard_lease_params, repo))) with {:ok, shard_lease} <- ShardLeases.get_shard_lease(shard_lease_params, repo) do shard_lease diff --git a/lib/kinesis_client/stream/app_state/ecto/shard_lease.ex b/lib/kinesis_client/stream/app_state/ecto/shard_lease.ex index 925cd39..723ac40 100644 --- a/lib/kinesis_client/stream/app_state/ecto/shard_lease.ex +++ b/lib/kinesis_client/stream/app_state/ecto/shard_lease.ex @@ -6,6 +6,7 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLease do import Ecto.Query @fields [:shard_id, :app_name, :stream_name, :lease_owner, :lease_count, :checkpoint, :completed] + @required_fields [:shard_id, :app_name, :stream_name, :lease_owner] @primary_key {:shard_id, :string, autogenerate: false} schema "shard_lease" do @@ -20,6 +21,7 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLease do def changeset(shard_lease, attrs) do shard_lease |> cast(attrs, @fields) + |> validate_required(@required_fields) |> unique_constraint(:shard_id, name: :shard_lease_pkey) end @@ -32,6 +34,8 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLease do end defp query_by({:shard_id, shard_id}, query) do + IO.inspect("query_by shard_id: #{inspect(shard_id)}, query: #{inspect(query)}") + where(query, [sl], sl.shard_id == ^shard_id) end diff --git a/lib/kinesis_client/stream/app_state/ecto/shard_leases.ex b/lib/kinesis_client/stream/app_state/ecto/shard_leases.ex index ec36137..c7b472d 100644 --- a/lib/kinesis_client/stream/app_state/ecto/shard_leases.ex +++ b/lib/kinesis_client/stream/app_state/ecto/shard_leases.ex @@ -1,8 +1,16 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeases do + @moduledoc false alias KinesisClient.Stream.AppState.Ecto.ShardLease @spec get_shard_lease(map, Ecto.Repo.t()) :: {:error, :not_found} | {:ok, ShardLease.t()} def get_shard_lease(params, repo) do + IO.inspect("get_shard_lease: repo: #{inspect(repo)}") + IO.inspect("get_shard_lease: params: #{inspect(params)}") + + IO.inspect( + "get_shard_lease: build_get_query #{inspect(ShardLease.build_get_query(ShardLease.query(), params))}" + ) + ShardLease.query() |> ShardLease.build_get_query(params) |> repo.one() @@ -15,8 +23,7 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeases do @spec get_shard_lease_by_id(String.t(), Ecto.Repo.t()) :: {:error, :not_found} | {:ok, ShardLease.t()} def get_shard_lease_by_id(shard_id, repo) do - %{shard_id: shard_id} - |> get_shard_lease(repo) + get_shard_lease(%{shard_id: shard_id}, repo) end @spec insert_shard_lease(map, Ecto.Repo.t()) :: diff --git a/lib/kinesis_client/stream/shard/producer.ex b/lib/kinesis_client/stream/shard/producer.ex index d76bedf..07db7e2 100644 --- a/lib/kinesis_client/stream/shard/producer.ex +++ b/lib/kinesis_client/stream/shard/producer.ex @@ -164,6 +164,7 @@ defmodule KinesisClient.Stream.Shard.Producer do :ok = AppState.update_checkpoint( + state.app_name, state.shard_id, state.stream_name, state.lease_owner, diff --git a/test/kinesis_client/stream/app_state/dynamo_test.exs b/test/kinesis_client/stream/app_state/dynamo_test.exs index faea028..5769fa5 100644 --- a/test/kinesis_client/stream/app_state/dynamo_test.exs +++ b/test/kinesis_client/stream/app_state/dynamo_test.exs @@ -12,6 +12,9 @@ defmodule KinesisClient.Stream.AppState.DynamoTest do app_name |> Dynamo.create_table("shard_id", %{shard_id: :string}, 1, 1) |> ExAws.request() :ok = confirm_table_created(app_name) + + TestSupervisor.start_link([]) + %{app_name: app_name} end @@ -27,6 +30,57 @@ defmodule KinesisClient.Stream.AppState.DynamoTest do end end + describe "delete_all_leases_and_restart_workers/2" do + test "handles business", context do + app_name = context[:app_name] + supervisor = TestSupervisor + + shard_leases = [ + %ShardLease{shard_id: "shard12", lease_owner: "owner1", completed: false, lease_count: 1}, + %ShardLease{shard_id: "shard13", lease_owner: "owner2", completed: false, lease_count: 1} + ] + + Enum.each(shard_leases, fn lease -> + app_name |> Dynamo.put_item(lease) |> ExAws.request() + end) + + shard_id = "shard12" + + case app_name + |> Dynamo.get_item(%{"shard_id" => shard_id}) + |> ExAws.request() do + {:ok, item} -> + assert item["Item"]["shard_id"] == %{"S" => "shard12"} + end + + assert {:ok, "Shard leases deleted and workers restarted"} = + AppState.delete_all_leases_and_restart_workers(supervisor, app_name, []) + + case app_name + |> Dynamo.get_item(%{"shard_id" => shard_id}) + |> ExAws.request() do + {:ok, item} -> + assert item["Item"]["shard_id"] == nil + end + end + + test "errors with invalid app/table" do + app_name = "nope" + supervisor = TestSupervisor + + assert {:error, {"ResourceNotFoundException", "Cannot do operations on a non-existent table"}} = + AppState.delete_all_leases_and_restart_workers(supervisor, app_name, []) + end + + test "errors with invalid process", context do + app_name = context[:app_name] + supervisor = FakeProcess + + assert {:error, "Supervisor not running"} = + AppState.delete_all_leases_and_restart_workers(supervisor, app_name, []) + end + end + describe "create_lease/3" do test "lease created and :ok returned if no record for shard exists", %{app_name: app_name} do result = AppState.create_lease(app_name, random_string(), worker_ref(), []) diff --git a/test/kinesis_client/stream/app_state/ecto/shard_leases_test.exs b/test/kinesis_client/stream/app_state/ecto/shard_leases_test.exs index 49cf274..874863d 100644 --- a/test/kinesis_client/stream/app_state/ecto/shard_leases_test.exs +++ b/test/kinesis_client/stream/app_state/ecto/shard_leases_test.exs @@ -6,11 +6,15 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeasesTest do test "get_shard_lease/2" do params = %{ - shard_id: "a.b.c" + shard_id: "a.b.c", + app_name: "app_name", + stream_name: "stream_name" } {:ok, shard_lease} = ShardLeases.get_shard_lease(params, Repo) + IO.inspect(shard_lease) + assert shard_lease.shard_id == "a.b.c" assert shard_lease.checkpoint == nil assert shard_lease.completed == false @@ -34,7 +38,9 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeasesTest do shard_id: "a.b.c", completed: false, lease_count: 1, - lease_owner: "test_owner" + lease_owner: "test_owner", + app_name: "test_app", + stream_name: "test_stream" } {:ok, shard_lease} = ShardLeases.insert_shard_lease(attrs, Repo) @@ -51,7 +57,9 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeasesTest do shard_id: "a.b.c", completed: false, lease_count: "INVALID", - lease_owner: "test_owner" + lease_owner: "test_owner", + app_name: "test_app", + stream_name: "test_stream" } {:error, changeset} = ShardLeases.insert_shard_lease(attrs, Repo) diff --git a/test/kinesis_client/stream/app_state/ecto_test.exs b/test/kinesis_client/stream/app_state/ecto_test.exs index 03873f1..c5391ea 100644 --- a/test/kinesis_client/stream/app_state/ecto_test.exs +++ b/test/kinesis_client/stream/app_state/ecto_test.exs @@ -5,7 +5,7 @@ defmodule KinesisClient.Stream.AppState.EctoTest do alias KinesisClient.Stream.AppState.Ecto test "creates a shard_lease" do - assert Ecto.create_lease("", "stream_name", "a.b.c", "test_owner", repo: Repo) == :ok + assert Ecto.create_lease("app_name", "stream_name", "a.b.c", "test_owner", repo: Repo) == :ok end test "gets a shard_lease" do @@ -56,4 +56,27 @@ defmodule KinesisClient.Stream.AppState.EctoTest do test "closes shard" do assert Ecto.close_shard("app_name", "stream_name", "a.b.c", "test_owner", repo: Repo) == :ok end + + describe "delete_all_leases_and_restart_workers" do + setup do + Process.flag(:trap_exit, true) + + TestSupervisor.start_link([]) + {:ok, supervisor: TestSupervisor} + end + + test "deletes all leases and restarts the supervisor if the supervisor is running", %{ + supervisor: supervisor + } do + result = Ecto.delete_all_leases_and_restart_workers(supervisor, "app_name", repo: Repo) + assert result == {:ok, "Shard leases deleted and workers restarted"} + end + + test "returns an error if the supervisor is not found", _context do + result = + Ecto.delete_all_leases_and_restart_workers(:non_existent_supervisor, "app_name", repo: Repo) + + assert result == {:error, "Supervisor not running"} + end + end end diff --git a/test/support/ecto_repo.ex b/test/support/ecto_repo.ex index 2f7ae42..4dc5d3e 100644 --- a/test/support/ecto_repo.ex +++ b/test/support/ecto_repo.ex @@ -33,4 +33,8 @@ defmodule KinesisClient.Ecto.Repo do def update(changeset, _opts) do {:error, changeset} end + + def delete_all(_query, _opts \\ []) do + :ok + end end diff --git a/test/support/test_generic_worker.ex b/test/support/test_generic_worker.ex new file mode 100644 index 0000000..ecaf005 --- /dev/null +++ b/test/support/test_generic_worker.ex @@ -0,0 +1,20 @@ +defmodule TestGenericWorker do + @moduledoc false + use GenServer + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(initial_value) do + {:ok, initial_value} + end + + def get_value do + GenServer.call(__MODULE__, :get_value) + end + + def handle_call(:get_value, _from, state) do + {:reply, state, state} + end +end diff --git a/test/support/test_supervisor.ex b/test/support/test_supervisor.ex new file mode 100644 index 0000000..269e9d7 --- /dev/null +++ b/test/support/test_supervisor.ex @@ -0,0 +1,21 @@ +defmodule TestSupervisor do + @moduledoc false + use Supervisor + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(_opts) do + children = [ + %{ + id: TestGenericWorker, + start: {TestGenericWorker, :start_link, [arg_value: "initial_value"]}, + type: :worker, + restart: :permanent + } + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end