Skip to content

feat: add function and module-based topic generation to PubSub #2139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/ash/actions/create/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ defmodule Ash.Actions.Create.Bulk do
changeset when upsert_condition != nil -> Ash.Changeset.filter(changeset, upsert_condition)
changeset -> changeset
end)
|> Ash.Notifier.PubSub.Helpers.add_topic_generator_loads()
end

defp lazy_matching_default_values(resource) do
Expand Down
1 change: 1 addition & 0 deletions lib/ash/actions/create/create.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule Ash.Actions.Create do
else
{changeset, opts} = Ash.Actions.Helpers.set_context_and_get_opts(domain, changeset, opts)
changeset = Helpers.apply_opts_load(changeset, opts)
changeset = Ash.Notifier.PubSub.Helpers.add_topic_generator_loads(changeset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't have behavior for a specific notifier here, it should be calling a callback on all of the notifiers for this resource and action.


Ash.Tracer.span :action,
fn ->
Expand Down
110 changes: 110 additions & 0 deletions lib/ash/notifier/pub_sub/helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
defmodule Ash.Notifier.PubSub.Helpers do
@moduledoc """
Helper functions for PubSub notifier integration.
"""

alias Ash.Notifier.PubSub

@doc """
Adds required loads from topic generators to a changeset or query.

This function inspects all PubSub publications configured for the resource
and action, extracts load requirements from topic generators that implement
the `required_loads/0` callback, and adds them to the changeset/query.

This ensures that when notifications are created, the data already has
the required relationships loaded, preventing N+1 queries in bulk operations.
"""
def add_topic_generator_loads(%Ash.Changeset{} = changeset) do
add_loads_for_action(changeset, changeset.resource, changeset.action)
end

def add_topic_generator_loads(%Ash.Query{} = query) do
# For queries, we might need to handle this differently
# For now, return unchanged
query
end

defp add_loads_for_action(changeset_or_query, resource, action) when not is_nil(action) do
# Get all PubSub notifiers for this resource
resource
|> Ash.Resource.Info.notifiers()
|> Enum.filter(&match?(Ash.Notifier.PubSub, &1))
|> Enum.reduce(changeset_or_query, fn _notifier, acc ->
# Get publications from the notifier
resource
|> PubSub.Info.publications()
|> Enum.filter(&publication_matches?(&1, action))
|> Enum.reduce(acc, &add_publication_loads/2)
end)
end

defp add_loads_for_action(changeset_or_query, _resource, _action) do
changeset_or_query
end

defp publication_matches?(%{action: pub_action}, %{name: action_name})
when is_atom(pub_action) do
pub_action == action_name
end

defp publication_matches?(%{type: pub_type}, %{type: action_type}) do
pub_type == action_type
end

defp publication_matches?(%{type: pub_type, except: except}, %{
type: action_type,
name: action_name
}) do
pub_type == action_type && action_name not in except
end

defp publication_matches?(_, _), do: false

defp add_publication_loads(publication, acc) do
case publication.topic do
{topic_generator, _opts} when is_atom(topic_generator) ->
add_topic_generator_loads_from_module(acc, topic_generator)

topic_generator when is_atom(topic_generator) ->
add_topic_generator_loads_from_module(acc, topic_generator)

_ ->
# String, list, or function topics don't have load requirements
acc
end
end

defp add_topic_generator_loads_from_module(changeset_or_query, topic_generator) do
if function_exported?(topic_generator, :required_loads, 0) do
loads = topic_generator.required_loads()

# Handle both map and list formats
load_list =
case loads do
%{} = load_map ->
load_map
|> Map.values()
|> List.flatten()
|> Enum.uniq()

load_list when is_list(load_list) ->
List.flatten([load_list])
|> Enum.uniq()

single_load ->
[single_load]
end

case changeset_or_query do
%Ash.Changeset{} = changeset ->
Ash.Changeset.load(changeset, load_list)

%Ash.Query{} = query ->
Ash.Query.load(query, load_list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't just load the things here, because the caller of the action could request the same load key but with different parameters which can lead to non deterministic behavior. This is what I was getting at with needing to add a custom calculation.

end
else
changeset_or_query
end
end
end
57 changes: 39 additions & 18 deletions lib/ash/notifier/pub_sub/topic_generator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ defmodule Ash.Notifier.PubSub.TopicGenerator do
defmodule MyApp.FollowerTopicGenerator do
@behaviour Ash.Notifier.PubSub.TopicGenerator

# Optional: Declare required loads for bulk optimization
@impl true
def required_loads do
[author: :followers]
end

@impl true
def generate_topics(notification, opts) do
# Example: broadcast post creation to all followers
# With required_loads/0, the data is already preloaded
case notification.action.type do
:create ->
followers =
notification.data
|> Ash.load!([author: :followers])
|> Map.get(:author, %{})
|> Map.get(:followers, [])
followers = get_in(notification.data, [:author, :followers]) || []

Enum.map(followers, fn follower ->
"user_feed:\#{follower.id}"
Expand Down Expand Up @@ -66,7 +69,6 @@ defmodule Ash.Notifier.PubSub.TopicGenerator do
end
```
"""

@doc """
Generate a list of topics based on the notification and options.

Expand All @@ -80,25 +82,44 @@ defmodule Ash.Notifier.PubSub.TopicGenerator do

Should return a list of strings representing the topics to publish to.
An empty list means no topics will be published.
"""
@callback generate_topics(notification :: Ash.Notifier.Notification.t(), opts :: keyword()) ::
String.t()
| [
String.t()
]

@doc """
Declare what relationships should be preloaded for efficient topic generation.

This optional callback allows topic generators to specify relationship
loading requirements. When implemented, the specified relationships will
be automatically loaded before the notification is created, eliminating
N+1 query problems in bulk operations.

## Returns

Any load statement compatible with `Ash.load/2`:
- `:relationship_name` - Load a single relationship
- `[:rel1, :rel2]` - Load multiple relationships
- `[author: :followers]` - Load nested relationships
- `[author: {:followers, query}]` - Load with custom query

## Examples

```elixir
def generate_topics(notification, _opts) do
case notification.data do
%{author_id: author_id} when not is_nil(author_id) ->
["author_feed:\#{author_id}"]
_ ->
[]
end
def required_loads do
[author: :followers]
end

def required_loads do
[:author, :tags, :comments]
end
```
"""
@callback generate_topics(notification :: Ash.Notifier.Notification.t(), opts :: keyword()) ::
String.t()
| [
String.t()
]
@callback required_loads() :: term()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a callback on Ash.Notifier I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nvm. This makes sense here. But my other comment about having a callback on the notifier itself to drive this is the thing.


@optional_callbacks required_loads: 0

@doc false
defmacro __using__(_opts) do
Expand Down
60 changes: 60 additions & 0 deletions test/notifier/bulk_aware_pubsub_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule Ash.Test.Notifier.BulkAwarePubSubTest do
@moduledoc false
use ExUnit.Case, async: true

defmodule TestTopicGenerator do
use Ash.Notifier.PubSub.TopicGenerator

@impl true
def required_loads do
[author: :followers]
end

@impl true
def generate_topics(notification, _opts) do
case notification.action.name do
:create ->
followers = get_in(notification.data, [:author, :followers]) || []

Enum.map(followers, fn follower ->
"user_feed:#{follower.id}"
end)

_ ->
[]
end
end
end

defmodule BasicTopicGenerator do
use Ash.Notifier.PubSub.TopicGenerator

@impl true
def generate_topics(notification, _opts) do
["basic:#{notification.data.id}"]
end
end

test "topic generator with required_loads callback works" do
assert function_exported?(TestTopicGenerator, :required_loads, 0)
assert TestTopicGenerator.required_loads() == [author: :followers]
end

test "topic generator without required_loads callback works" do
refute function_exported?(BasicTopicGenerator, :required_loads, 0)
end

test "Publication.topic accepts topic generators with required_loads" do
assert {:ok, {TestTopicGenerator, []}} =
Ash.Notifier.PubSub.Publication.topic(TestTopicGenerator)

assert {:ok, {TestTopicGenerator, [foo: :bar]}} =
Ash.Notifier.PubSub.Publication.topic({TestTopicGenerator, [foo: :bar]})
end

test "topic generators can declare required loads" do
# Test the new simplified approach works
assert function_exported?(TestTopicGenerator, :required_loads, 0)
assert TestTopicGenerator.required_loads() == [author: :followers]
end
end
Loading
Loading