diff --git a/.rspec b/.rspec index b782d24..ec13759 100644 --- a/.rspec +++ b/.rspec @@ -1 +1,2 @@ ---color --format documentation \ No newline at end of file +--color --format documentation +--require spec_helper diff --git a/lib/futuroscope.rb b/lib/futuroscope.rb index 8065bc6..65c4a54 100644 --- a/lib/futuroscope.rb +++ b/lib/futuroscope.rb @@ -1,9 +1,10 @@ -require "futuroscope/version" -require "futuroscope/pool" -require "futuroscope/future" -require "futuroscope/map" - module Futuroscope + require_relative "futuroscope/deadlock_error" + require_relative "futuroscope/future" + require_relative "futuroscope/map" + require_relative "futuroscope/pool" + require_relative "futuroscope/version" + # Gets the default futuroscope's pool. # # Returns a Pool @@ -20,4 +21,17 @@ def self.default_pool def self.default_pool=(pool) @default_pool = pool end + + # Gets the current loggers. Add objects to it that have the below methods defined to log on them. + # For example, instances of Ruby's core Logger will work. + def self.loggers + @loggers ||= [] + end + + # Inward facing methods, called whenever a component wants to log something to the loggers. + [:debug, :info, :warn, :error, :fatal].each do |log_method| + define_singleton_method(log_method) do |message| + loggers.each { |logger| logger.send(log_method, message) } + end + end end diff --git a/lib/futuroscope/deadlock_error.rb b/lib/futuroscope/deadlock_error.rb new file mode 100644 index 0000000..e9f72f6 --- /dev/null +++ b/lib/futuroscope/deadlock_error.rb @@ -0,0 +1 @@ +Futuroscope::DeadlockError = Class.new StandardError diff --git a/lib/futuroscope/future.rb b/lib/futuroscope/future.rb index 601815f..b38c5f3 100644 --- a/lib/futuroscope/future.rb +++ b/lib/futuroscope/future.rb @@ -8,9 +8,11 @@ module Futuroscope # the future. That is, will block when the result is not ready until it is, # and will return it instantly if the thread's execution already finished. # - class Future < Delegator + class Future < ::Delegator extend ::Forwardable + attr_accessor :worker_thread + # Initializes a future with a block and starts its execution. # # Examples: @@ -27,21 +29,32 @@ class Future < Delegator # # Returns a Future def initialize(pool = ::Futuroscope.default_pool, &block) - @queue = ::SizedQueue.new(1) + @worker_finished = ConditionVariable.new @pool = pool @block = block - @mutex = Mutex.new - @pool.queue self + @mutex = ::Mutex.new + @worker_thread = nil + @pool.push self end # Semipublic: Forces this future to be run. - def run_future - @queue.push(value: @block.call) - rescue ::Exception => e - @queue.push(exception: e) + def resolve! + @mutex.synchronize do + begin + Thread.handle_interrupt(DeadlockError => :immediate) do + @resolved_future = { value: @block.call } + end + rescue ::Exception => e + @resolved_future = { exception: e } + ensure + @pool.done_with self + @worker_thread = nil + @worker_finished.broadcast + end + end end - # Semipublic: Returns the future's value. Will wait for the future to be + # Semipublic: Returns the future's value. Will wait for the future to be # completed or return its value otherwise. Can be called multiple times. # # Returns the Future's block execution result. @@ -61,6 +74,10 @@ def marshal_load value @resolved_future = value end + def resolved? + instance_variable_defined? :@resolved_future + end + def_delegators :__getobj__, :class, :kind_of?, :is_a?, :nil? alias_method :future_value, :__getobj__ @@ -68,16 +85,19 @@ def marshal_load value private def resolved_future_value_or_raise - resolved = resolved_future_value - - Kernel.raise resolved[:exception] if resolved[:exception] - resolved + resolved_future.tap do |resolved| + ::Kernel.raise resolved[:exception] if resolved.has_key?(:exception) + end end - def resolved_future_value - @resolved_future || @mutex.synchronize do - @resolved_future ||= @queue.pop - end + def resolved_future + @mutex.synchronize do + unless resolved? + @pool.depend self + @worker_finished.wait(@mutex) + end + end unless resolved? + @resolved_future end end end diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index 2c29775..9b92435 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -1,5 +1,4 @@ require 'set' -require 'thread' require 'futuroscope/worker' module Futuroscope @@ -7,8 +6,8 @@ module Futuroscope # certain benefits. Moreover, we warm up the threads beforehand so we don't # have to spin them up each time a future is created. class Pool - attr_reader :workers - attr_accessor :min_workers, :max_workers + attr_reader :workers, :min_workers + attr_accessor :max_workers # Public: Initializes a new Pool. # @@ -16,78 +15,206 @@ class Pool def initialize(range = 8..16) @min_workers = range.min @max_workers = range.max - @queue = Queue.new - @workers = Set.new - @mutex = Mutex.new + @dependencies = {} + @priorities = {} + @future_needs_worker = ConditionVariable.new + @workers = ::Set.new + @mutex = ::Mutex.new warm_up_workers + + # We need to keep references to the futures to prevent them from being GC'd. + # However, they can't be the keys of @priorities, because Hash will call #hash on them, which is forwarded to the + # wrapped object, causing a deadlock. Not forwarding is not an option, because then to the outside world + # futures won't be transparent: hsh[:key] will not be the same as hsh[future { :key }]. + @futures = {} end - # Public: Enqueues a new Future into the pool. + + # Public: Pushes a Future into the worklist with low priority. # - # future - The Future to enqueue. - def queue(future) + # future - The Future to push. + def push(future) @mutex.synchronize do - spin_worker if can_spin_extra_workers? - @queue.push future + Futuroscope.info "PUSH: added future #{future.__id__}" + @priorities[future.__id__] = 0 + @futures[future.__id__] = future + spin_worker if need_extra_worker? + Futuroscope.info " sending signal to wake up a thread" + Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{k}", v] }.to_h}" + @future_needs_worker.signal end end - # Public: Pops a new job from the pool. It will return nil if there's + + # Public: Pops a new job from the pool. It will return nil if there's # enough workers in the pool to take care of it. # # Returns a Future def pop + @mutex.synchronize { await_future(more_workers_than_needed? ? 2 : nil) } + end + + + # Public: Indicates that the current thread is waiting for a Future. + # + # future - The Future being waited for. + def depend(future) @mutex.synchronize do - return nil if @queue.empty? && more_workers_than_needed? + Futuroscope.info "DEPEND: thread #{Thread.current.__id__} depends on future #{future.__id__}" + @dependencies[Thread.current] = future + Futuroscope.debug " current dependencies: #{@dependencies.map { |k, v| ["thread #{k.__id__}", "future #{v.__id__}"] }.to_h}" + handle_deadlocks + dependent_future_id = current_thread_future_id + incr = 1 + (dependent_future_id.nil? ? 0 : @priorities[dependent_future_id]) + increment_priority(future, incr) end - return @queue.pop end - # Private: Notifies that a worker just died so it can be removed from the - # pool. - # - # worker - A Worker - def worker_died(worker) + + # Semipublic: Called by a worker to indicate that it finished resolving a future. + def done_with(future) @mutex.synchronize do - @workers.delete(worker) + Futuroscope.info "DONE: thread #{Thread.current.__id__} is done with future #{future.__id__}" + Futuroscope.info " deleting future #{future.__id__} from the task list" + @futures.delete future.__id__ + @priorities.delete future.__id__ + dependencies_to_delete = @dependencies.select { |dependent, dependee| dependee.__id__ == future.__id__ } + dependencies_to_delete.each do |dependent, dependee| + Futuroscope.info " deleting dependency from thread #{dependent.__id__} to future #{dependee.__id__}" + @dependencies.delete dependent + end end end + def min_workers=(count) @min_workers = count warm_up_workers end + private def warm_up_workers @mutex.synchronize do - while(@workers.length < @min_workers) do + while workers.length < min_workers do spin_worker end end end - def can_spin_extra_workers? - @workers.length < @max_workers && span_chance - end - def span_chance - [true, false].sample + def finalize + workers.each do |worker| + workers.delete worker + worker.thread.kill + end end - def more_workers_than_needed? - @workers.length > @min_workers - end + + # The below methods should only be called with @mutex already acquired. + # These are only extracted for readability purposes. + def spin_worker worker = Worker.new(self) - @workers << worker + workers << worker worker.run + Futuroscope.info " spun up worker with thread #{worker.thread.__id__}" end - def finalize - @workers.each(&:stop) + + def increment_priority(future, increment) + return nil if NilClass === future + Futuroscope.info " incrementing priority for future #{future.__id__}" + @priorities[future.__id__] += increment + increment_priority(@dependencies[future.worker_thread], increment) + end + + + def current_thread_future_id + @priorities.keys.find { |id| @futures[id].worker_thread == Thread.current } + end + + + def await_future(timeout) + until @priorities.any? { |future_id, priority| @futures[future_id].worker_thread.nil? } + Futuroscope.info "POP: thread #{Thread.current.__id__} going to sleep until there's something to do#{timeout && " or #{timeout} seconds"}..." + @future_needs_worker.wait(@mutex, timeout) + Futuroscope.info "POP: ... thread #{Thread.current.__id__} woke up" + Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{k}", v] }.to_h}" + Futuroscope.debug " current future workers: #{@priorities.map { |k, v| ["future #{k}", (thread = @futures[k].worker_thread; thread.nil? ? nil : "thread #{thread.__id__}")] }.to_h}" + if more_workers_than_needed? && !@priorities.any? { |future_id, priority| @futures[future_id].worker_thread.nil? } + Futuroscope.info " thread #{Thread.current.__id__} is dying because there's nothing to do and there are more threads than the minimum" + workers.delete_if { |worker| worker.thread == Thread.current } + return nil + end + timeout = nil + end + future_id = @priorities.select { |future_id, priority| @futures[future_id].worker_thread.nil? }.max_by { |future_id, priority| priority }.first + Futuroscope.info "POP: thread #{Thread.current.__id__} will start working on future #{future_id}" + future = @futures[future_id] + future.worker_thread = Thread.current + future + end + + + def handle_deadlocks + Thread.handle_interrupt(DeadlockError => :immediate) do + Thread.handle_interrupt(DeadlockError => :never) do + if !(cycle = find_cycle).nil? + Futuroscope.error " deadlock! cyclical dependency, sending interrupt to all threads involved" + cycle.each { |thread| thread.raise DeadlockError, "Cyclical dependency detected, the future was aborted." } + elsif cycleless_deadlock? + thread_to_interrupt = least_priority_independent_thread + Futuroscope.error " deadlock! ran out of workers, sending interrupt to thread #{thread_to_interrupt.__id__}" + thread_to_interrupt.raise DeadlockError, "Pool size is too low, the future was aborted." + end + end + end + end + + + def find_cycle + chain = [Thread.current] + loop do + last_thread = chain.last + return nil unless @dependencies.has_key?(last_thread) + next_future = @dependencies[last_thread] + next_thread = next_future.worker_thread + return nil if next_thread.nil? + return chain if next_thread == chain.first + chain << next_thread + end + end + + + def cycleless_deadlock? + workers.all? { |worker| @dependencies.has_key?(worker.thread) } && workers.count == max_workers + end + + + def least_priority_independent_thread + @priorities.sort_by(&:last).map(&:first).each do |future_id| + its_thread = @futures[future_id].worker_thread + return its_thread if !its_thread.nil? && @dependencies[its_thread].worker_thread.nil? + end + end + + + def need_extra_worker? + workers.count < max_workers && futures_needing_worker.count > workers.count(&:free) + end + + + def more_workers_than_needed? + workers.count > min_workers && futures_needing_worker.count < workers.count(&:free) end + + + def futures_needing_worker + @futures.values.select { |future| future.worker_thread.nil? } + end + end end diff --git a/lib/futuroscope/worker.rb b/lib/futuroscope/worker.rb index b0eb2f5..65cf32c 100644 --- a/lib/futuroscope/worker.rb +++ b/lib/futuroscope/worker.rb @@ -2,11 +2,14 @@ module Futuroscope # A futuroscope worker takes care of resolving a future's value. It works # together with a Pool. class Worker + attr_reader :thread, :free + # Public: Initializes a new Worker. # # pool - The worker Pool it belongs to. def initialize(pool) @pool = pool + @free = true end # Runs the worker. It keeps asking the Pool for a new job. If the pool @@ -16,23 +19,14 @@ def initialize(pool) # def run @thread = Thread.new do - while(future = @pool.pop) do - future.run_future + Thread.handle_interrupt(DeadlockError => :never) do + while future = @pool.pop do + @free = false + future.resolve! + @free = true + end end - die end end - - # Public: Stops this worker. - def stop - @thread.kill - die - end - - private - - def die - @pool.worker_died(self) - end end end diff --git a/spec/futuroscope/convenience_spec.rb b/spec/futuroscope/convenience_spec.rb index 276f0be..36b034e 100644 --- a/spec/futuroscope/convenience_spec.rb +++ b/spec/futuroscope/convenience_spec.rb @@ -1,4 +1,3 @@ -require 'spec_helper' require 'futuroscope/convenience' require 'timeout' diff --git a/spec/futuroscope/future_spec.rb b/spec/futuroscope/future_spec.rb index b2c8f93..7c15c1b 100644 --- a/spec/futuroscope/future_spec.rb +++ b/spec/futuroscope/future_spec.rb @@ -1,5 +1,3 @@ -require 'spec_helper' -require 'futuroscope/future' require 'timeout' module Futuroscope diff --git a/spec/futuroscope/map_spec.rb b/spec/futuroscope/map_spec.rb index 3ba0104..434ed78 100644 --- a/spec/futuroscope/map_spec.rb +++ b/spec/futuroscope/map_spec.rb @@ -1,6 +1,3 @@ -require 'spec_helper' -require 'futuroscope/map' - module Futuroscope describe Map do it "behaves like a normal map" do @@ -9,7 +6,7 @@ module Futuroscope sleep(item) "Item #{item}" end - + Timeout::timeout(4) do expect(result.first).to eq("Item 1") expect(result[1]).to eq("Item 2") diff --git a/spec/futuroscope/pool_spec.rb b/spec/futuroscope/pool_spec.rb index 7d13f16..c0d24f4 100644 --- a/spec/futuroscope/pool_spec.rb +++ b/spec/futuroscope/pool_spec.rb @@ -1,6 +1,3 @@ -require 'spec_helper' -require 'futuroscope/pool' - module Futuroscope describe Pool do it "spins up a number of workers" do @@ -11,29 +8,46 @@ module Futuroscope expect(pool.workers).to have(3).workers end - describe "queue" do + describe "push" do it "enqueues a job and runs it" do pool = Pool.new - future = double(:future) + future = Struct.new(:worker_thread).new(nil) - expect(future).to receive :run_future - pool.queue future + expect(future).to receive :resolve! + pool.push future sleep(0.1) end end + describe "depend" do + it "detects cyclical dependencies" do + pool = Pool.new(2..2) + f2 = nil + f1 = Future.new(pool) { f2 = Future.new(pool) { f1.future_value }; f2.future_value } + + expect { f1.future_value }.to raise_error Futuroscope::DeadlockError, /Cyclical dependency detected/ + expect { f2.future_value }.to raise_error Futuroscope::DeadlockError, /Cyclical dependency detected/ + end + + it "detects non-cyclical deadlocks (when the pool is full and all futures are waiting for another future)" do + pool = Pool.new(1..1) + f = Future.new(pool) { Future.new(pool) { 1 } + 1 } + + expect { f.future_value }.to raise_error Futuroscope::DeadlockError, /Pool size is too low/ + end + end + describe "worker control" do it "adds more workers when needed and returns to the default amount" do pool = Pool.new(2..8) - allow(pool).to receive(:span_chance).and_return true - 10.times do |future| + 10.times do Future.new(pool){ sleep(1) } end sleep(0.5) expect(pool.workers).to have(8).workers - sleep(1.5) + sleep(3) expect(pool.workers).to have(2).workers end @@ -66,14 +80,5 @@ module Futuroscope expect(pool.workers).to have(0).workers end end - - describe "#span_chance" do - it "returns true or false randomly" do - pool = Pool.new - chance = pool.send(:span_chance) - - expect([true, false]).to include(chance) - end - end end end diff --git a/spec/futuroscope/worker_spec.rb b/spec/futuroscope/worker_spec.rb index cade905..5235594 100644 --- a/spec/futuroscope/worker_spec.rb +++ b/spec/futuroscope/worker_spec.rb @@ -1,24 +1,11 @@ -require 'spec_helper' -require 'futuroscope/worker' -require 'futuroscope/pool' - module Futuroscope describe Worker do it "asks the pool for a new job and runs the future" do future = double(:future) pool = [future] - expect(future).to receive :run_future - - Worker.new(pool).run - sleep(1) - end - - it "notifies the pool when the worker died because there's no job" do - pool = [] - worker = Worker.new(pool) + expect(future).to receive :resolve! - expect(pool).to receive(:worker_died).with(worker) - worker.run + described_class.new(pool).run sleep(1) end end diff --git a/spec/futuroscope_spec.rb b/spec/futuroscope_spec.rb index 2e3da06..62016a0 100644 --- a/spec/futuroscope_spec.rb +++ b/spec/futuroscope_spec.rb @@ -1,6 +1,3 @@ -require 'spec_helper' -require 'futuroscope' - describe Futuroscope do describe "default_pool" do it "returns a pool by default" do @@ -15,4 +12,21 @@ expect(Futuroscope.default_pool).to equal(pool) end end + + describe "logging" do + it "logs messages to all the given loggers" do + logger1 = double "Logger 1" + logger2 = double "Logger 2" + Futuroscope.loggers << logger1 << logger2 + + expect(logger1).to receive(:info).at_least(10).times + expect(logger2).to receive(:info).at_least(10).times + + expect(logger1).to receive(:debug).at_least(5).times + expect(logger2).to receive(:debug).at_least(5).times + + Futuroscope::Future.new { Futuroscope::Future.new { 1 } + 1 } + sleep(0.1) + end + end end