Skip to content

feat: add function and module-based topic generation to PubSub #2139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions documentation/dsls/DSL-Ash.Notifier.PubSub.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,63 @@ Configured with `broadcast_type`.
- `:phoenix_broadcast` sends a `%Phoenix.Socket.Broadcast{}` (see above)
- `:broadcast` sends `%{topic: (topic), event: (event), payload: (notification)}`

## Dynamic Topic Generation

For advanced use cases, you can use functions or modules to generate topics dynamically:

### Function-based Topics

```elixir
pub_sub do
module MyAppWeb.Endpoint
prefix "posts"

publish :create, fn notification ->
# Return a list of topic strings
["user_feed:#{notification.data.author_id}"]
end
end
```

### Module-based Topics

First, implement the `Ash.Notifier.PubSub.TopicGenerator` behavior:

```elixir
defmodule MyApp.FollowerTopicGenerator do
use Ash.Notifier.PubSub.TopicGenerator

@impl true
def generate_topics(notification, opts) do
# Custom logic to generate topics based on relationships
followers =
notification.data
|> Ash.load!([author: :followers])
|> Map.get(:author, %{})
|> Map.get(:followers, [])

Enum.map(followers, fn follower ->
"user_feed:#{follower.id}"
end)
end
end
```

Then use it in your pub_sub configuration:

```elixir
pub_sub do
module MyAppWeb.Endpoint
prefix "posts"

# With options
publish_all :create, {MyApp.FollowerTopicGenerator, []}

# Without options
publish_all :update, MyApp.FollowerTopicGenerator
end
```


## pub_sub
A section for configuring how resource actions are published over pubsub
Expand Down
1 change: 1 addition & 0 deletions lib/ash/actions/create/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ defmodule Ash.Actions.Create.Bulk do
changeset when upsert_condition != nil -> Ash.Changeset.filter(changeset, upsert_condition)
changeset -> changeset
end)
|> Ash.Notifier.PubSub.Helpers.add_topic_generator_loads()
end

defp lazy_matching_default_values(resource) do
Expand Down
1 change: 1 addition & 0 deletions lib/ash/actions/create/create.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule Ash.Actions.Create do
else
{changeset, opts} = Ash.Actions.Helpers.set_context_and_get_opts(domain, changeset, opts)
changeset = Helpers.apply_opts_load(changeset, opts)
changeset = Ash.Notifier.PubSub.Helpers.add_topic_generator_loads(changeset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't have behavior for a specific notifier here, it should be calling a callback on all of the notifiers for this resource and action.


Ash.Tracer.span :action,
fn ->
Expand Down
110 changes: 110 additions & 0 deletions lib/ash/notifier/pub_sub/helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
defmodule Ash.Notifier.PubSub.Helpers do
@moduledoc """
Helper functions for PubSub notifier integration.
"""

alias Ash.Notifier.PubSub

@doc """
Adds required loads from topic generators to a changeset or query.

This function inspects all PubSub publications configured for the resource
and action, extracts load requirements from topic generators that implement
the `required_loads/0` callback, and adds them to the changeset/query.

This ensures that when notifications are created, the data already has
the required relationships loaded, preventing N+1 queries in bulk operations.
"""
def add_topic_generator_loads(%Ash.Changeset{} = changeset) do
add_loads_for_action(changeset, changeset.resource, changeset.action)
end

def add_topic_generator_loads(%Ash.Query{} = query) do
# For queries, we might need to handle this differently
# For now, return unchanged
query
end

defp add_loads_for_action(changeset_or_query, resource, action) when not is_nil(action) do
# Get all PubSub notifiers for this resource
resource
|> Ash.Resource.Info.notifiers()
|> Enum.filter(&match?(Ash.Notifier.PubSub, &1))
|> Enum.reduce(changeset_or_query, fn _notifier, acc ->
# Get publications from the notifier
resource
|> PubSub.Info.publications()
|> Enum.filter(&publication_matches?(&1, action))
|> Enum.reduce(acc, &add_publication_loads/2)
end)
end

defp add_loads_for_action(changeset_or_query, _resource, _action) do
changeset_or_query
end

defp publication_matches?(%{action: pub_action}, %{name: action_name})
when is_atom(pub_action) do
pub_action == action_name
end

defp publication_matches?(%{type: pub_type}, %{type: action_type}) do
pub_type == action_type
end

defp publication_matches?(%{type: pub_type, except: except}, %{
type: action_type,
name: action_name
}) do
pub_type == action_type && action_name not in except
end

defp publication_matches?(_, _), do: false

defp add_publication_loads(publication, acc) do
case publication.topic do
{topic_generator, _opts} when is_atom(topic_generator) ->
add_topic_generator_loads_from_module(acc, topic_generator)

topic_generator when is_atom(topic_generator) ->
add_topic_generator_loads_from_module(acc, topic_generator)

_ ->
# String, list, or function topics don't have load requirements
acc
end
end

defp add_topic_generator_loads_from_module(changeset_or_query, topic_generator) do
if function_exported?(topic_generator, :required_loads, 0) do
loads = topic_generator.required_loads()

# Handle both map and list formats
load_list =
case loads do
%{} = load_map ->
load_map
|> Map.values()
|> List.flatten()
|> Enum.uniq()

load_list when is_list(load_list) ->
List.flatten([load_list])
|> Enum.uniq()

single_load ->
[single_load]
end

case changeset_or_query do
%Ash.Changeset{} = changeset ->
Ash.Changeset.load(changeset, load_list)

%Ash.Query{} = query ->
Ash.Query.load(query, load_list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't just load the things here, because the caller of the action could request the same load key but with different parameters which can lead to non deterministic behavior. This is what I was getting at with needing to add a custom calculation.

end
else
changeset_or_query
end
end
end
100 changes: 100 additions & 0 deletions lib/ash/notifier/pub_sub/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,63 @@ defmodule Ash.Notifier.PubSub do
- `:notification` just sends the notification
- `:phoenix_broadcast` sends a `%Phoenix.Socket.Broadcast{}` (see above)
- `:broadcast` sends `%{topic: (topic), event: (event), payload: (notification)}`

## Dynamic Topic Generation

For advanced use cases, you can use functions or modules to generate topics dynamically:

### Function-based Topics

```elixir
pub_sub do
module MyAppWeb.Endpoint
prefix "posts"

publish :create, fn notification ->
# Return a list of topic strings
["user_feed:\#{notification.data.author_id}"]
end
end
```

### Module-based Topics

First, implement the `Ash.Notifier.PubSub.TopicGenerator` behavior:

```elixir
defmodule MyApp.FollowerTopicGenerator do
use Ash.Notifier.PubSub.TopicGenerator

@impl true
def generate_topics(notification, opts) do
# Custom logic to generate topics based on relationships
followers =
notification.data
|> Ash.load!([author: :followers])
|> Map.get(:author, %{})
|> Map.get(:followers, [])

Enum.map(followers, fn follower ->
"user_feed:\#{follower.id}"
end)
end
end
```

Then use it in your pub_sub configuration:

```elixir
pub_sub do
module MyAppWeb.Endpoint
prefix "posts"

# With options
publish_all :create, {MyApp.FollowerTopicGenerator, []}

# Without options
publish_all :update, MyApp.FollowerTopicGenerator
end
```
"""

use Spark.Dsl.Extension,
Expand Down Expand Up @@ -333,6 +390,49 @@ defmodule Ash.Notifier.PubSub do
defp fill_template(topic, _notification, _delimiter, _previous_values?) when is_binary(topic),
do: [topic]

defp fill_template(topic, notification, _delimiter, _previous_values?)
when is_function(topic, 1) do
case topic.(notification) do
topics when is_list(topics) ->
Enum.filter(topics, &is_binary/1)

topic when is_binary(topic) ->
[topic]

other ->
Logger.warning(
"Topic function returned invalid format: #{inspect(other)}. Expected string or list of strings."
)

[]
end
rescue
error ->
Logger.warning("Topic function failed: #{inspect(error)}")
[]
end

defp fill_template({module, opts}, notification, _delimiter, _previous_values?) do
case module.generate_topics(notification, opts) do
topics when is_list(topics) ->
Enum.filter(topics, &is_binary/1)

topic when is_binary(topic) ->
[topic]

other ->
Logger.warning(
"Topic generator #{inspect(module)} returned invalid format: #{inspect(other)}. Expected list of strings."
)

[]
end
rescue
error ->
Logger.warning("Topic generator #{inspect(module)} failed: #{inspect(error)}")
[]
end

defp fill_template(topic, notification, delimiter, previous_values?) do
topic
|> all_combinations_of_values(notification, notification.action.type, previous_values?)
Expand Down
24 changes: 23 additions & 1 deletion lib/ash/notifier/pub_sub/publication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,31 @@ defmodule Ash.Notifier.PubSub.Publication do
end
end

def topic(topic) when is_function(topic, 1) do
{:ok, topic}
end

def topic({module, opts}) when is_atom(module) and is_list(opts) do
if Code.ensure_loaded?(module) and function_exported?(module, :generate_topics, 2) do
{:ok, {module, opts}}
else
{:error,
"Module #{inspect(module)} must implement the Ash.Notifier.PubSub.TopicGenerator behavior"}
end
end

def topic(module) when is_atom(module) do
if Code.ensure_loaded?(module) and function_exported?(module, :generate_topics, 2) do
{:ok, {module, []}}
else
{:error,
"Module #{inspect(module)} must implement the Ash.Notifier.PubSub.TopicGenerator behavior"}
end
end

def topic(other) do
{:error,
"Expected topic to be a string or a list of strings or attribute names (as atoms), got: #{inspect(other)}"}
"Expected topic to be a string, list of strings or attribute names (as atoms), function/1, module, or {module, opts} tuple, got: #{inspect(other)}"}
end

defp nested_list_of_binaries_or_atoms?(list) when is_list(list) do
Expand Down
Loading
Loading