|
| 1 | +defmodule Realtime.BroadcastChanges.Handler do |
| 2 | + use GenServer |
| 3 | + require Logger |
| 4 | + |
| 5 | + import Realtime.Adapters.Postgres.Protocol |
| 6 | + import Realtime.Adapters.Postgres.Decoder |
| 7 | + import Realtime.Helpers, only: [log_error: 2] |
| 8 | + |
| 9 | + alias Realtime.Adapters.Postgres.Decoder |
| 10 | + alias Realtime.Adapters.Postgres.Protocol.KeepAlive |
| 11 | + alias Realtime.Adapters.Postgres.Protocol.Write |
| 12 | + alias Realtime.Api.Tenant |
| 13 | + alias Realtime.BroadcastChanges.PostgresReplication |
| 14 | + alias Realtime.Database |
| 15 | + alias Realtime.Tenants.BatchBroadcast |
| 16 | + alias Realtime.Tenants.Cache |
| 17 | + |
| 18 | + defstruct [:tenant_id, relations: %{}, buffer: [], postgres_replication_pid: nil] |
| 19 | + |
| 20 | + @behaviour PostgresReplication.Handler |
| 21 | + @registry Realtime.BroadcastChanges.Handler.Registry |
| 22 | + |
| 23 | + @spec name(Tenant.t()) :: term() |
| 24 | + def name(%Tenant{external_id: tenant_id}) do |
| 25 | + {:via, Registry, {@registry, tenant_id}} |
| 26 | + end |
| 27 | + |
| 28 | + @spec supervisor_spec(Tenant.t()) :: term() |
| 29 | + def supervisor_spec(%Tenant{external_id: tenant_id}) do |
| 30 | + {:via, PartitionSupervisor, {Realtime.BroadcastChanges.Handler.DynamicSupervisor, tenant_id}} |
| 31 | + end |
| 32 | + |
| 33 | + @impl true |
| 34 | + def call(message, metadata) when is_write(message) do |
| 35 | + %{tenant_id: tenant_id} = metadata |
| 36 | + %Write{message: message} = parse(message) |
| 37 | + |
| 38 | + case Registry.lookup(@registry, tenant_id) do |
| 39 | + [{pid, _}] -> |
| 40 | + message |> decode_message() |> then(&send(pid, &1)) |
| 41 | + :noreply |
| 42 | + |
| 43 | + _ -> |
| 44 | + Logger.error("Unable to find BroadcastChanges for tenant: #{tenant_id}") |
| 45 | + :shutdown |
| 46 | + end |
| 47 | + end |
| 48 | + |
| 49 | + def call(message, _metadata) when is_keep_alive(message) do |
| 50 | + %KeepAlive{reply: reply, wal_end: wal_end} = parse(message) |
| 51 | + wal_end = wal_end + 1 |
| 52 | + |
| 53 | + message = |
| 54 | + case reply do |
| 55 | + :now -> standby_status(wal_end, wal_end, wal_end, reply) |
| 56 | + :later -> hold() |
| 57 | + end |
| 58 | + |
| 59 | + {:reply, message} |
| 60 | + end |
| 61 | + |
| 62 | + def call(msg, state) do |
| 63 | + Logger.info("Unknown message received: #{inspect(%{msg: parse(msg), state: state})}") |
| 64 | + :noreply |
| 65 | + end |
| 66 | + |
| 67 | + @impl true |
| 68 | + def handle_info(%Decoder.Messages.Relation{} = msg, state) do |
| 69 | + %Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg |
| 70 | + %{relations: relations} = state |
| 71 | + relation = %{name: name, columns: columns, namespace: namespace} |
| 72 | + relations = Map.put(relations, id, relation) |
| 73 | + {:noreply, %{state | relations: relations}} |
| 74 | + end |
| 75 | + |
| 76 | + def handle_info(%Decoder.Messages.Insert{} = msg, state) do |
| 77 | + %Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg |
| 78 | + %{buffer: buffer, relations: relations} = state |
| 79 | + |
| 80 | + case Map.get(relations, relation_id) do |
| 81 | + %{columns: columns} -> |
| 82 | + to_broadcast = |
| 83 | + tuple_data |
| 84 | + |> Tuple.to_list() |
| 85 | + |> Enum.zip(columns) |
| 86 | + |> Enum.map(fn |
| 87 | + {nil, %{name: name}} -> {name, nil} |
| 88 | + {value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)} |
| 89 | + {value, %{name: name, type: "bool"}} -> {name, value == "t"} |
| 90 | + {value, %{name: name}} -> {name, value} |
| 91 | + end) |
| 92 | + |> Map.new() |
| 93 | + |
| 94 | + payload = Map.get(to_broadcast, "payload") |
| 95 | + |
| 96 | + case payload do |
| 97 | + nil -> |
| 98 | + {:noreply, state} |
| 99 | + |
| 100 | + payload -> |
| 101 | + topic = Map.get(to_broadcast, "topic") |
| 102 | + private = Map.get(to_broadcast, "private") |
| 103 | + event = Map.get(to_broadcast, "event") |
| 104 | + |
| 105 | + id = Map.get(to_broadcast, "id") |
| 106 | + |
| 107 | + payload = Map.put(payload, "id", id) |
| 108 | + |
| 109 | + to_broadcast = |
| 110 | + %{ |
| 111 | + topic: topic, |
| 112 | + event: event, |
| 113 | + private: private, |
| 114 | + payload: payload |
| 115 | + } |
| 116 | + |
| 117 | + buffer = buffer ++ [to_broadcast] |
| 118 | + {:noreply, %{state | buffer: buffer}} |
| 119 | + end |
| 120 | + |
| 121 | + _ -> |
| 122 | + log_error("UnknownBroadcastChangesRelation", "Relation ID not found: #{relation_id}") |
| 123 | + {:noreply, state} |
| 124 | + end |
| 125 | + end |
| 126 | + |
| 127 | + def handle_info(%Decoder.Messages.Commit{}, %{buffer: []} = state) do |
| 128 | + {:noreply, state} |
| 129 | + end |
| 130 | + |
| 131 | + def handle_info(%Decoder.Messages.Commit{}, state) do |
| 132 | + %{buffer: buffer, tenant_id: tenant_id} = state |
| 133 | + tenant = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_id) |
| 134 | + |
| 135 | + case BatchBroadcast.broadcast(nil, tenant, %{messages: buffer}, true) do |
| 136 | + :ok -> :ok |
| 137 | + error -> log_error("UnableToBatchBroadcastChanges", error) |
| 138 | + end |
| 139 | + |
| 140 | + {:noreply, %{state | buffer: []}} |
| 141 | + end |
| 142 | + |
| 143 | + def handle_info(_, state), do: {:noreply, state} |
| 144 | + |
| 145 | + @impl true |
| 146 | + def terminate(reason, _state) do |
| 147 | + log_error("BroadcastChangesHandlerTerminated", reason) |
| 148 | + :ok |
| 149 | + end |
| 150 | + |
| 151 | + def start_link(opts), do: GenServer.start_link(__MODULE__, opts, opts) |
| 152 | + |
| 153 | + @impl true |
| 154 | + def init(opts) do |
| 155 | + tenant_id = Keyword.fetch!(opts, :tenant_id) |
| 156 | + |
| 157 | + tenant = Cache.get_tenant_by_external_id(tenant_id) |
| 158 | + connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop, true) |
| 159 | + |
| 160 | + supervisor = |
| 161 | + {:via, PartitionSupervisor, |
| 162 | + {Realtime.BroadcastChanges.Listener.DynamicSupervisor, tenant_id}} |
| 163 | + |
| 164 | + name = {:via, Registry, {Realtime.BroadcastChanges.Listener.Registry, tenant_id}} |
| 165 | + |
| 166 | + configuration = %PostgresReplication{ |
| 167 | + connection_opts: [ |
| 168 | + hostname: connection_opts.host, |
| 169 | + username: connection_opts.user, |
| 170 | + password: connection_opts.pass, |
| 171 | + database: connection_opts.name, |
| 172 | + port: connection_opts.port, |
| 173 | + parameters: [ |
| 174 | + application_name: connection_opts.application_name |
| 175 | + ] |
| 176 | + ], |
| 177 | + table: "messages", |
| 178 | + schema: "realtime", |
| 179 | + handler_module: __MODULE__, |
| 180 | + opts: [name: name], |
| 181 | + metadata: %{tenant_id: tenant_id} |
| 182 | + } |
| 183 | + |
| 184 | + children_spec = %{ |
| 185 | + id: Handler, |
| 186 | + start: {PostgresReplication, :start_link, [configuration]}, |
| 187 | + type: :worker |
| 188 | + } |
| 189 | + |
| 190 | + state = %__MODULE__{tenant_id: tenant_id, buffer: [], relations: %{}} |
| 191 | + |
| 192 | + case DynamicSupervisor.start_child(supervisor, children_spec) do |
| 193 | + {:ok, pid} -> |
| 194 | + {:ok, %{state | postgres_replication_pid: pid}} |
| 195 | + |
| 196 | + {:error, {:already_started, pid}} -> |
| 197 | + {:ok, %{state | postgres_replication_pid: pid}} |
| 198 | + |
| 199 | + error -> |
| 200 | + log_error("UnableToStartPostgresReplication", error) |
| 201 | + {:stop, error} |
| 202 | + end |
| 203 | + end |
| 204 | +end |
0 commit comments