Skip to content

Use a queue for execution #5389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1fdc9cd
Start on a run_queue
rmosolgo Jun 19, 2025
9d5b6da
Add resolve_type step
rmosolgo Jun 20, 2025
fdbfd2f
Move #run into Result classes
rmosolgo Jun 20, 2025
8f04cc9
Make a RunQueue object
rmosolgo Jun 20, 2025
c45f628
support sequential mutation fields
rmosolgo Jun 23, 2025
32ea55c
Fix dataloader integration
rmosolgo Jun 23, 2025
c188312
Implement resolve_each
rmosolgo Jun 23, 2025
d9f4ebb
Isolate field resolution in step object
rmosolgo Jun 23, 2025
56f38c0
Add some Lazy support
rmosolgo Jun 23, 2025
186e822
Fix inspect output
rmosolgo Jun 23, 2025
caa754b
More lazy support, better mutation eager execution
rmosolgo Jun 24, 2025
0f090cc
Move execute field methods into ResultHash
rmosolgo Jun 24, 2025
325d5d8
use self instead of selection result
rmosolgo Jun 24, 2025
b233c92
Move resolution code into FieldResolveStep
rmosolgo Jun 24, 2025
8837fa7
Merge branch 'master' into run-queue-3
rmosolgo Jun 24, 2025
d3927b8
Add todos
rmosolgo Jun 25, 2025
123bf35
Merge ResolveTypeStep into ResultHash
rmosolgo Jun 26, 2025
898ba4a
Use named states
rmosolgo Jun 26, 2025
6f6c79e
Move authorized into runtime state machine
rmosolgo Jun 26, 2025
466a80c
Move directive resolution into other steps
rmosolgo Jun 27, 2025
bdc1b08
Run dataloader if arguments need it
rmosolgo Jun 27, 2025
7f24a79
Start working on dataloader compat
rmosolgo Jul 3, 2025
805605d
Merge branch 'master' into run-queue-3
rmosolgo Jul 8, 2025
96e5d1c
Share a run queue within a multiplex; run steps inside a dataloader job
rmosolgo Jul 8, 2025
46044fa
Support appending callables directly to dataloader
rmosolgo Jul 8, 2025
e3cf018
Improve eager continuation in FieldResolveStep, improve dataloader ba…
rmosolgo Jul 8, 2025
b1a8c74
Rework to support lazy arguments
rmosolgo Jul 8, 2025
5be5032
Improve current runtime state
rmosolgo Jul 10, 2025
f29cac4
Catch unauthorized errors from field resolution
rmosolgo Jul 10, 2025
c4ebc03
Support list scoping
rmosolgo Jul 10, 2025
a46bcfa
Start merging RunQueue back into Dataloader
rmosolgo Jul 10, 2025
1f5117e
Remove RunQueue
rmosolgo Jul 10, 2025
77e7485
Remove Interpreter::Resolve which is now needless
rmosolgo Jul 10, 2025
a2285cd
Merge branch 'master' into run-queue-3
rmosolgo Jul 10, 2025
7812e22
Return NullDataloader which can be frozen
rmosolgo Jul 11, 2025
b386fd6
Rescue some errors; hack to fix double-execute
rmosolgo Jul 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions benchmark/batch_loading.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ def initialize(options = {column: :id})

def fetch(keys)
keys.map { |key|
d = GraphQLBatchSchema::DATA.find { |d| d[@column] == key }
# p [key, @column, d]
d
GraphQLBatchSchema::DATA.find { |d| d[@column] == key }
}
end
end
Expand Down
38 changes: 36 additions & 2 deletions lib/graphql/dataloader.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require "graphql/dataloader/flat_dataloader"
require "graphql/dataloader/null_dataloader"
require "graphql/dataloader/request"
require "graphql/dataloader/request_all"
Expand Down Expand Up @@ -64,8 +65,14 @@ def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.cl
@nonblocking = nonblocking
end
@fiber_limit = fiber_limit
@steps_to_rerun_after_lazy = []
@lazies_at_depth = nil
end

attr_accessor :lazies_at_depth

attr_reader :steps_to_rerun_after_lazy

# @return [Integer, nil]
attr_reader :fiber_limit

Expand Down Expand Up @@ -140,10 +147,10 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])
end

# @api private Nothing to see here
def append_job(&job)
def append_job(callable = nil, &job)
# Given a block, queue it up to be worked through when `#run` is called.
# (If the dataloader is already running, than a Fiber will pick this up later.)
@pending_jobs.push(job)
@pending_jobs.push(callable || job)
nil
end

Expand Down Expand Up @@ -189,6 +196,8 @@ def run_isolated
end

def run
# TODO unify the initialization lazies_at_depth
@lazies_at_depth ||= Hash.new { |h, k| h[k] = [] }
trace = Fiber[:__graphql_current_multiplex]&.current_trace
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
job_fibers = []
Expand Down Expand Up @@ -222,6 +231,31 @@ def run
end
join_queues(source_fibers, next_source_fibers)
end

if @lazies_at_depth.any?
smallest_depth = nil
@lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
if depth_key < smallest_depth
smallest_depth = depth_key
end
end

if smallest_depth
lazies = @lazies_at_depth.delete(smallest_depth)
if !lazies.empty?
append_job {
lazies.each(&:value) # resolve these Lazy instances
}
job_fibers << spawn_job_fiber(trace)
end
end
elsif @steps_to_rerun_after_lazy.any?
@pending_jobs.concat(@steps_to_rerun_after_lazy)
f = spawn_job_fiber(trace)
job_fibers << f
@steps_to_rerun_after_lazy.clear
end
end

trace&.end_dataloader(self)
Expand Down
76 changes: 76 additions & 0 deletions lib/graphql/dataloader/flat_dataloader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# frozen_string_literal: true

module GraphQL
class Dataloader
class FlatDataloader < Dataloader
def initialize(*)
# TODO unify the initialization lazies_at_depth
@lazies_at_depth ||= Hash.new { |h, k| h[k] = [] }
@steps_to_rerun_after_lazy = []
@queue = []
end

def run
while @queue.any?
while (step = @queue.shift)
step.call
end

while @lazies_at_depth&.any?
smallest_depth = nil
@lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
if depth_key < smallest_depth
smallest_depth = depth_key
end
end

if smallest_depth
lazies = @lazies_at_depth.delete(smallest_depth)
lazies.each(&:value) # resolve these Lazy instances
end
end

if @steps_to_rerun_after_lazy.any?
@steps_to_rerun_after_lazy.each(&:call)
@steps_to_rerun_after_lazy.clear
end
end
end

def run_isolated
prev_queue = @queue
prev_stral = @steps_to_rerun_after_lazy
prev_lad = @lazies_at_depth
@steps_to_rerun_after_lazy = []
@queue = []
@lazies_at_depth = @lazies_at_depth.dup&.clear
res = nil
append_job {
res = yield
}
run
res
ensure
@queue = prev_queue
@steps_to_rerun_after_lazy = prev_stral
@lazies_at_depth = prev_lad
end

def clear_cache; end

def yield(_source)
raise GraphQL::Error, "GraphQL::Dataloader is not running -- add `use GraphQL::Dataloader` to your schema to use Dataloader sources."
end

def append_job(callable = nil, &block)
@queue << (callable || block)
nil
end

def with(*)
raise GraphQL::Error, "GraphQL::Dataloader is not running -- add `use GraphQL::Dataloader` to your schema to use Dataloader sources."
end
end
end
end
4 changes: 2 additions & 2 deletions lib/graphql/dataloader/null_dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def yield(_source)
raise GraphQL::Error, "GraphQL::Dataloader is not running -- add `use GraphQL::Dataloader` to your schema to use Dataloader sources."
end

def append_job
yield
def append_job(callable = nil)
callable ? callable.call : yield
nil
end

Expand Down
12 changes: 1 addition & 11 deletions lib/graphql/execution/interpreter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
require "graphql/execution/interpreter/arguments_cache"
require "graphql/execution/interpreter/execution_errors"
require "graphql/execution/interpreter/runtime"
require "graphql/execution/interpreter/resolve"
require "graphql/execution/interpreter/handles_raw_value"

module GraphQL
Expand Down Expand Up @@ -43,6 +42,7 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
schema = multiplex.schema
queries = multiplex.queries
lazies_at_depth = Hash.new { |h, k| h[k] = [] }
multiplex.dataloader.lazies_at_depth = lazies_at_depth
multiplex_analyzers = schema.multiplex_analyzers
if multiplex.max_complexity
multiplex_analyzers += [GraphQL::Analysis::MaxQueryComplexity]
Expand Down Expand Up @@ -90,15 +90,6 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl

multiplex.dataloader.run

# Then, work through lazy results in a breadth-first way
multiplex.dataloader.append_job {
query = multiplex.queries.length == 1 ? multiplex.queries[0] : nil
multiplex.current_trace.execute_query_lazy(multiplex: multiplex, query: query) do
Interpreter::Resolve.resolve_each_depth(lazies_at_depth, multiplex.dataloader)
end
}
multiplex.dataloader.run

# Then, find all errors and assign the result to the query object
results.each_with_index do |data_result, idx|
query = queries[idx]
Expand All @@ -122,7 +113,6 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
end

result["data"] = query.context.namespace(:interpreter_runtime)[:runtime].final_result

result
end
if query.context.namespace?(:__query_result_extensions__)
Expand Down
100 changes: 0 additions & 100 deletions lib/graphql/execution/interpreter/resolve.rb

This file was deleted.

Loading
Loading