Skip to content

Implement multi queue to reduce test files load time #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ruby/lib/ci/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ def from_uri(url, config)
File
when 'redis', 'rediss'
require 'ci/queue/redis'
Redis
if config.multi_queue_config
require 'ci/queue/multi_queue'
MultiQueue
else
Redis
end
else
raise ArgumentError, "Don't know how to handle #{uri.scheme} URLs"
end
Expand Down
5 changes: 4 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Configuration
attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint
attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration
attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds
attr_accessor :multi_queue_config
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
Expand Down Expand Up @@ -37,7 +38,8 @@ def initialize(
grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil,
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil)
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil,
multi_queue_config: nil)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
@failure_file = failure_file
Expand All @@ -64,6 +66,7 @@ def initialize(
@warnings_file = warnings_file
@debug_log = debug_log
@max_missed_heartbeat_seconds = max_missed_heartbeat_seconds
@multi_queue_config = multi_queue_config
end

def queue_init_timeout
Expand Down
243 changes: 243 additions & 0 deletions ruby/lib/ci/queue/multi_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# frozen_string_literal: true

require "benchmark"
require "forwardable"

module CI
module Queue
class MultiQueue
include Common
extend Forwardable

class << self
def from_uri(uri, config)
new(uri.to_s, config)
end
end

attr_reader :redis_url, :config, :redis, :current_queue

def initialize(redis_url, config)
@redis_url = redis_url
@config = config
if ::Redis::VERSION > "5.0.0"
@redis = ::Redis.new(
url: redis_url
)
else
@redis = ::Redis.new(url: redis_url)
end
@shutdown_required = false
@starting_queue_assigned = false
@current_queue = starting_queue
end

def distributed?
true
end

def retrying?
queues.any?(&:retrying?)
end

def expired?
queues.any?(&:expired?)
end

def exhausted?
queues.all?(&:exhausted?)
end

def total
total = 0
queues.each do |q|
t = q.supervisor.total
puts "Queue #{q.name}, total: #{t}"
total += t if t
end
total
end

def size
queues.sum(&:size)
end

def progress
total - size
end

def remaining
queues.sum(&:remaining)
end

def running
queues.sum(&:running)
end

def poll
queues.each do |q|
@current_queue = q

begin
if q.exhausted?
puts "# All tests executed in #{q.name} queue, skipping..."
next
end

prev_loaded_tests = Minitest.loaded_tests
q.load_tests!
q.populate(Minitest.loaded_tests - prev_loaded_tests, random: ordering_seed, &:id) unless q.populated?

puts "# Processing #{q.size} tests in #{q.name} queue..."

q.poll do |test|
yield test
end
rescue *CI::Queue::Redis::Base::CONNECTION_ERRORS
end
end
end

def max_test_failed?
return false if config.max_test_failed.nil?

queues.sum(&:test_failed) >= config.max_test_failed
end

def build
@build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config)
end

def supervisor
@supervisor ||= Supervisor.new(multi_queue: self)
end

def retry_queue
# TODO: implement
end

def created_at=(time)
queues.each { |q| q.created_at = time }
end

def shutdown!
@shutdown_required = true
end

def shutdown_required?
@shutdown_required
end

def_delegators :@current_queue, :acknowledge, :requeue, :populate, :release!, :increment_test_failed

# TODO: move heartbeat into module
def boot_heartbeat_process!; end

def with_heartbeat(id)
yield
end

def ensure_heartbeat_thread_alive!; end

def stop_heartbeat!; end

class SubQueue < SimpleDelegator
attr_reader :name

def initialize(worker:, multi_queue:, name:, test_files:, preload_files:)
super(worker)
@name = name
@test_files = test_files
@multi_queue = multi_queue
@preload_files = preload_files
@preloaded = false
end

def load_tests!
duration = Benchmark.realtime do
@test_files.each do |test_file|
require ::File.expand_path(test_file)
end
end

puts "# Loaded #{@test_files.size} test files in #{name} queue in #{duration.round(2)} seconds"
end

def max_test_failed?
@multi_queue.max_test_failed?
end

def preload_files!
return if @preload_files.empty? || @preloaded

@preload_files.each do |file|
require ::File.expand_path(file)
end

@preloaded = true
end
end

class Supervisor < SimpleDelegator
def initialize(multi_queue:)
super(multi_queue)
@multi_queue = multi_queue
end

def wait_for_workers
wait_statuses = @multi_queue.queues.map do |q|
status = q.supervisor.wait_for_workers do
yield
end
puts "# Queue #{q.name} finished running with status #{status}"
status
end
wait_statuses.all? { |status| status == true }
end

def queue_initialized?
all_queues_initialized = true
@multi_queue.queues.each do |q|
puts "Queue #{q.name} initialized: #{q.queue_initialized?}"
unless q.queue_initialized?
puts "Queue #{q.name} was not initialized"
all_queues_initialized = false
end
end
all_queues_initialized
end
end

def queues
@queues ||= @config.multi_queue_config["queues"].map do |name, files|
sub_queue_config = @config.dup.tap { |c| c.namespace = name }
SubQueue.new(
worker:CI::Queue::Redis::Worker.new(@redis_url, sub_queue_config, @redis),
multi_queue: self, name: name, test_files: files, preload_files: @config.multi_queue_config["preload_files"]
)
end
@queues
end

private

def starting_queue
return queues.first if @starting_queue_assigned

starting_queue = queues.delete_at(@config.worker_id.to_i % queues.size)
queues.unshift(starting_queue)
@starting_queue_assigned = true
starting_queue
end

# Worth extracting this into a module?
def ordering_seed
if @config.seed
Random.new(Digest::MD5.hexdigest(@config.seed).to_i(16))
else
Random.new
end
end
end
end
end
13 changes: 9 additions & 4 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ def call_pipelined(commands, redis_config)
end
end

def initialize(redis_url, config)
def initialize(redis_url, config, redis = nil)
@redis_url = redis_url
@config = config
if ::Redis::VERSION > "5.0.0"
@redis = ::Redis.new(
@redis = redis
@redis ||= if ::Redis::VERSION > "5.0.0"
::Redis.new(
url: redis_url,
# Booting a CI worker is costly, so in case of a Redis blip,
# it makes sense to retry for a while before giving up.
Expand All @@ -40,7 +41,7 @@ def initialize(redis_url, config)
custom: custom_config,
)
else
@redis = ::Redis.new(url: redis_url)
::Redis.new(url: redis_url)
end
end

Expand Down Expand Up @@ -126,6 +127,10 @@ def running
redis.zcard(key('running'))
end

def total
redis.get(key('total')).to_i
end

def to_a
redis.multi do |transaction|
transaction.lrange(key('queue'), 0, -1)
Expand Down
4 changes: 2 additions & 2 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class << self
class Worker < Base
attr_reader :total

def initialize(redis, config)
def initialize(redis_url, config, redis = nil)
@reserved_test = nil
@shutdown_required = false
super(redis, config)
super(redis_url, config, redis)
end

def distributed?
Expand Down
13 changes: 13 additions & 0 deletions ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ def <=>(other)
id <=> other.id
end

def ==(other)
self.eql?(other)
end

def eql?(other)
self.class == other.class &&
id == other.id
end

def hash
id.hash
end

def with_timestamps
start_timestamp = current_timestamp
result = yield
Expand Down
18 changes: 16 additions & 2 deletions ruby/lib/minitest/queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,13 @@ def set_load_path
end

def load_tests
argv.sort.each do |f|
require File.expand_path(f)
if queue_config.multi_queue_config
queue.current_queue.preload_files!
queue.current_queue.load_tests!
else
argv.sort.each do |f|
require File.expand_path(f)
end
end
end

Expand Down Expand Up @@ -631,6 +636,15 @@ def parser
opts.on('--failing-test TEST_IDENTIFIER') do |identifier|
queue_config.failing_test = identifier
end

help = <<~EOS
The file path for multi-queue configuration. It should be a valid YAML file.
The file should map queue names to a list of their test files.
EOS
opts.on('--multi-queue-config PATH', help) do |path|
require 'yaml'
queue_config.multi_queue_config = YAML.load(File.read(path))
end
end
end

Expand Down