Skip to content

Delete shard leases #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/kinesis_client/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ defmodule KinesisClient.Stream do
]

Logger.debug(
"Starting KinesisClient.Stream: [app_name: #{app_name}, stream_name: {stream_name}]"
"Starting KinesisClient.Stream: [app_name: #{app_name}, stream_name: #{stream_name}]"
)

Supervisor.init(children, strategy: :one_for_all)
Expand Down
9 changes: 7 additions & 2 deletions lib/kinesis_client/stream/app_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ defmodule KinesisClient.Stream.AppState do
Get a `KinesisClient.Stream.AppState.ShardInfo` struct by shard_id. If there is not an existing
record, returns `:not_found`.
"""
def get_lease(app_name, stream_name, shard_id, opts \\ []),
do: adapter(opts).get_lease(app_name, stream_name, shard_id, opts)
def get_lease(app_name, stream_name, shard_id, opts \\ []) do
IO.inspect("app get_lease called")
IO.inspect("app_name: #{app_name}, stream_name: #{stream_name}, shard_id: #{shard_id}")
IO.puts("opts: #{inspect(opts)}")
IO.puts("adapter: #{inspect(adapter(opts))}")
adapter(opts).get_lease(app_name, stream_name, shard_id, opts)
end

@doc """
Persists a new ShardInfo record. Returns an error if there is already a record for that `shard_id`
Expand Down
6 changes: 6 additions & 0 deletions lib/kinesis_client/stream/app_state/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,10 @@ defmodule KinesisClient.Stream.AppState.Adapter do
opts :: keyword
) ::
:ok | {:error, any}

@callback delete_all_leases_and_restart_workers(
supervisor :: Supervisor.t(),
app_name :: String.t(),
opts :: keyword
) :: {:ok, any} | {:error, any}
end
29 changes: 29 additions & 0 deletions lib/kinesis_client/stream/app_state/dynamo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,35 @@ defmodule KinesisClient.Stream.AppState.Dynamo do
end
end

@impl true
def delete_all_leases_and_restart_workers(supervisor, app_name, _opts) do
case app_name
|> Dynamo.scan()
|> ExAws.request() do
{:ok, items} ->
Enum.each(items["Items"], fn lease ->
cond do
lease["shard_id"] ->
app_name
|> Dynamo.delete_item(%{"shard_id" => lease["shard_id"]["S"]})
|> ExAws.request()
end
end)

case Process.whereis(supervisor) do
nil ->
{:error, "Supervisor not running"}

pid ->
Process.exit(pid, :shutdown)
{:ok, "Shard leases deleted and workers restarted"}
end

{:error, error} ->
{:error, error}
end
end

defp create_table(app_name) do
app_name
|> send_create_table_request()
Expand Down
30 changes: 30 additions & 0 deletions lib/kinesis_client/stream/app_state/ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule KinesisClient.Stream.AppState.Ecto do
alias KinesisClient.Stream.AppState.Ecto.ShardLease
alias KinesisClient.Stream.AppState.Ecto.ShardLeases

require Logger

@migrations [
{CreateShardLeaseTable.version(), CreateShardLeaseTable},
{AddAppAndStreamNameColumns.version(), AddAppAndStreamNameColumns}
Expand All @@ -21,6 +23,28 @@ defmodule KinesisClient.Stream.AppState.Ecto do
end
end

@impl true
def delete_all_leases_and_restart_workers(supervisor, __app_name, opts) do
repo = Keyword.get(opts, :repo)

with supervisor when not is_nil(supervisor) <- Process.whereis(supervisor),
:ok <- repo.delete_all(ShardLease) do
Logger.info("Shard leases deleted")
Process.exit(supervisor, :shutdown)
Logger.info("Restarting workers")

{:ok, "Shard leases deleted and workers restarted"}
else
nil ->
Logger.info("Supervisor not running")
{:error, "Supervisor not running"}

_ ->
Logger.info("Failed to delete shard leases, reason unknown")
{:error, "Failed to delete shard leases, reason unknown"}
end
end

@impl true
def create_lease(app_name, stream_name, shard_id, lease_owner, opts) do
repo = Keyword.get(opts, :repo)
Expand All @@ -46,8 +70,14 @@ defmodule KinesisClient.Stream.AppState.Ecto do

@impl true
def get_lease(app_name, stream_name, shard_id, opts) do
IO.inspect("ecto get_lease called")
repo = Keyword.get(opts, :repo)
IO.puts("repo: #{inspect(repo)}")
shard_lease_params = %{shard_id: shard_id, app_name: app_name, stream_name: stream_name}
IO.puts("get_lease: params: #{inspect(shard_lease_params)}")

IO.puts("get_lease: get_shard_lease result")
IO.puts(inspect(ShardLeases.get_shard_lease(shard_lease_params, repo)))

with {:ok, shard_lease} <- ShardLeases.get_shard_lease(shard_lease_params, repo) do
shard_lease
Expand Down
4 changes: 4 additions & 0 deletions lib/kinesis_client/stream/app_state/ecto/shard_lease.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLease do
import Ecto.Query

@fields [:shard_id, :app_name, :stream_name, :lease_owner, :lease_count, :checkpoint, :completed]
@required_fields [:shard_id, :app_name, :stream_name, :lease_owner]

@primary_key {:shard_id, :string, autogenerate: false}
schema "shard_lease" do
Expand All @@ -20,6 +21,7 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLease do
def changeset(shard_lease, attrs) do
shard_lease
|> cast(attrs, @fields)
|> validate_required(@required_fields)
|> unique_constraint(:shard_id, name: :shard_lease_pkey)
end

Expand All @@ -32,6 +34,8 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLease do
end

defp query_by({:shard_id, shard_id}, query) do
IO.inspect("query_by shard_id: #{inspect(shard_id)}, query: #{inspect(query)}")

where(query, [sl], sl.shard_id == ^shard_id)
end

Expand Down
11 changes: 9 additions & 2 deletions lib/kinesis_client/stream/app_state/ecto/shard_leases.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
defmodule KinesisClient.Stream.AppState.Ecto.ShardLeases do
@moduledoc false
alias KinesisClient.Stream.AppState.Ecto.ShardLease

@spec get_shard_lease(map, Ecto.Repo.t()) :: {:error, :not_found} | {:ok, ShardLease.t()}
def get_shard_lease(params, repo) do
IO.inspect("get_shard_lease: repo: #{inspect(repo)}")
IO.inspect("get_shard_lease: params: #{inspect(params)}")

IO.inspect(
"get_shard_lease: build_get_query #{inspect(ShardLease.build_get_query(ShardLease.query(), params))}"
)

ShardLease.query()
|> ShardLease.build_get_query(params)
|> repo.one()
Expand All @@ -15,8 +23,7 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeases do
@spec get_shard_lease_by_id(String.t(), Ecto.Repo.t()) ::
{:error, :not_found} | {:ok, ShardLease.t()}
def get_shard_lease_by_id(shard_id, repo) do
%{shard_id: shard_id}
|> get_shard_lease(repo)
get_shard_lease(%{shard_id: shard_id}, repo)
end

@spec insert_shard_lease(map, Ecto.Repo.t()) ::
Expand Down
1 change: 1 addition & 0 deletions lib/kinesis_client/stream/shard/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ defmodule KinesisClient.Stream.Shard.Producer do

:ok =
AppState.update_checkpoint(
state.app_name,
state.shard_id,
state.stream_name,
state.lease_owner,
Expand Down
54 changes: 54 additions & 0 deletions test/kinesis_client/stream/app_state/dynamo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ defmodule KinesisClient.Stream.AppState.DynamoTest do
app_name |> Dynamo.create_table("shard_id", %{shard_id: :string}, 1, 1) |> ExAws.request()

:ok = confirm_table_created(app_name)

TestSupervisor.start_link([])

%{app_name: app_name}
end

Expand All @@ -27,6 +30,57 @@ defmodule KinesisClient.Stream.AppState.DynamoTest do
end
end

describe "delete_all_leases_and_restart_workers/2" do
test "handles business", context do
app_name = context[:app_name]
supervisor = TestSupervisor

shard_leases = [
%ShardLease{shard_id: "shard12", lease_owner: "owner1", completed: false, lease_count: 1},
%ShardLease{shard_id: "shard13", lease_owner: "owner2", completed: false, lease_count: 1}
]

Enum.each(shard_leases, fn lease ->
app_name |> Dynamo.put_item(lease) |> ExAws.request()
end)

shard_id = "shard12"

case app_name
|> Dynamo.get_item(%{"shard_id" => shard_id})
|> ExAws.request() do
{:ok, item} ->
assert item["Item"]["shard_id"] == %{"S" => "shard12"}
end

assert {:ok, "Shard leases deleted and workers restarted"} =
AppState.delete_all_leases_and_restart_workers(supervisor, app_name, [])

case app_name
|> Dynamo.get_item(%{"shard_id" => shard_id})
|> ExAws.request() do
{:ok, item} ->
assert item["Item"]["shard_id"] == nil
end
end

test "errors with invalid app/table" do
app_name = "nope"
supervisor = TestSupervisor

assert {:error, {"ResourceNotFoundException", "Cannot do operations on a non-existent table"}} =
AppState.delete_all_leases_and_restart_workers(supervisor, app_name, [])
end

test "errors with invalid process", context do
app_name = context[:app_name]
supervisor = FakeProcess

assert {:error, "Supervisor not running"} =
AppState.delete_all_leases_and_restart_workers(supervisor, app_name, [])
end
end

describe "create_lease/3" do
test "lease created and :ok returned if no record for shard exists", %{app_name: app_name} do
result = AppState.create_lease(app_name, random_string(), worker_ref(), [])
Expand Down
14 changes: 11 additions & 3 deletions test/kinesis_client/stream/app_state/ecto/shard_leases_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeasesTest do

test "get_shard_lease/2" do
params = %{
shard_id: "a.b.c"
shard_id: "a.b.c",
app_name: "app_name",
stream_name: "stream_name"
}

{:ok, shard_lease} = ShardLeases.get_shard_lease(params, Repo)

IO.inspect(shard_lease)

assert shard_lease.shard_id == "a.b.c"
assert shard_lease.checkpoint == nil
assert shard_lease.completed == false
Expand All @@ -34,7 +38,9 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeasesTest do
shard_id: "a.b.c",
completed: false,
lease_count: 1,
lease_owner: "test_owner"
lease_owner: "test_owner",
app_name: "test_app",
stream_name: "test_stream"
}

{:ok, shard_lease} = ShardLeases.insert_shard_lease(attrs, Repo)
Expand All @@ -51,7 +57,9 @@ defmodule KinesisClient.Stream.AppState.Ecto.ShardLeasesTest do
shard_id: "a.b.c",
completed: false,
lease_count: "INVALID",
lease_owner: "test_owner"
lease_owner: "test_owner",
app_name: "test_app",
stream_name: "test_stream"
}

{:error, changeset} = ShardLeases.insert_shard_lease(attrs, Repo)
Expand Down
25 changes: 24 additions & 1 deletion test/kinesis_client/stream/app_state/ecto_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule KinesisClient.Stream.AppState.EctoTest do
alias KinesisClient.Stream.AppState.Ecto

test "creates a shard_lease" do
assert Ecto.create_lease("", "stream_name", "a.b.c", "test_owner", repo: Repo) == :ok
assert Ecto.create_lease("app_name", "stream_name", "a.b.c", "test_owner", repo: Repo) == :ok
end

test "gets a shard_lease" do
Expand Down Expand Up @@ -56,4 +56,27 @@ defmodule KinesisClient.Stream.AppState.EctoTest do
test "closes shard" do
assert Ecto.close_shard("app_name", "stream_name", "a.b.c", "test_owner", repo: Repo) == :ok
end

describe "delete_all_leases_and_restart_workers" do
setup do
Process.flag(:trap_exit, true)

TestSupervisor.start_link([])
{:ok, supervisor: TestSupervisor}
end

test "deletes all leases and restarts the supervisor if the supervisor is running", %{
supervisor: supervisor
} do
result = Ecto.delete_all_leases_and_restart_workers(supervisor, "app_name", repo: Repo)
assert result == {:ok, "Shard leases deleted and workers restarted"}
end

test "returns an error if the supervisor is not found", _context do
result =
Ecto.delete_all_leases_and_restart_workers(:non_existent_supervisor, "app_name", repo: Repo)

assert result == {:error, "Supervisor not running"}
end
end
end
4 changes: 4 additions & 0 deletions test/support/ecto_repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ defmodule KinesisClient.Ecto.Repo do
def update(changeset, _opts) do
{:error, changeset}
end

def delete_all(_query, _opts \\ []) do
:ok
end
end
20 changes: 20 additions & 0 deletions test/support/test_generic_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule TestGenericWorker do
@moduledoc false
use GenServer

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(initial_value) do
{:ok, initial_value}
end

def get_value do
GenServer.call(__MODULE__, :get_value)
end

def handle_call(:get_value, _from, state) do
{:reply, state, state}
end
end
21 changes: 21 additions & 0 deletions test/support/test_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule TestSupervisor do
@moduledoc false
use Supervisor

def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(_opts) do
children = [
%{
id: TestGenericWorker,
start: {TestGenericWorker, :start_link, [arg_value: "initial_value"]},
type: :worker,
restart: :permanent
}
]

Supervisor.init(children, strategy: :one_for_one)
end
end