-
Notifications
You must be signed in to change notification settings - Fork 13
Huge feature pack: Deadlock detection, prioritized futures, smart up/down spinning of workers, logging #23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7d44694
e022587
2f27666
fc77ddc
072de6f
9a61cef
161eaa2
df61e68
4c08c51
22fca86
a16a000
fa997fb
ccc1940
3aafd5e
62fb3f1
20cf980
e323b22
3326db3
4683a54
c804098
f78ab42
ed62b96
fdd46fd
a262ba7
0b5eb1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
--color --format documentation | ||
--color --format documentation | ||
--require spec_helper |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,10 @@ | ||
require "futuroscope/version" | ||
require "futuroscope/pool" | ||
require "futuroscope/future" | ||
require "futuroscope/map" | ||
|
||
module Futuroscope | ||
require_relative "futuroscope/deadlock_error" | ||
require_relative "futuroscope/future" | ||
require_relative "futuroscope/map" | ||
require_relative "futuroscope/pool" | ||
require_relative "futuroscope/version" | ||
|
||
# Gets the default futuroscope's pool. | ||
# | ||
# Returns a Pool | ||
|
@@ -20,4 +21,17 @@ def self.default_pool | |
def self.default_pool=(pool) | ||
@default_pool = pool | ||
end | ||
|
||
# Gets the current loggers. Add objects to it that have the below methods defined to log on them. | ||
# For example, instances of Ruby's core Logger will work. | ||
def self.loggers | ||
@loggers ||= [] | ||
end | ||
|
||
# Inward facing methods, called whenever a component wants to log something to the loggers. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [93/80] |
||
[:debug, :info, :warn, :error, :fatal].each do |log_method| | ||
define_singleton_method(log_method) do |message| | ||
loggers.each { |logger| logger.send(log_method, message) } | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Futuroscope::DeadlockError = Class.new StandardError |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,9 +8,11 @@ module Futuroscope | |
# the future. That is, will block when the result is not ready until it is, | ||
# and will return it instantly if the thread's execution already finished. | ||
# | ||
class Future < Delegator | ||
class Future < ::Delegator | ||
extend ::Forwardable | ||
|
||
attr_accessor :worker_thread | ||
|
||
# Initializes a future with a block and starts its execution. | ||
# | ||
# Examples: | ||
|
@@ -27,21 +29,32 @@ class Future < Delegator | |
# | ||
# Returns a Future | ||
def initialize(pool = ::Futuroscope.default_pool, &block) | ||
@queue = ::SizedQueue.new(1) | ||
@worker_finished = ConditionVariable.new | ||
@pool = pool | ||
@block = block | ||
@mutex = Mutex.new | ||
@pool.queue self | ||
@mutex = ::Mutex.new | ||
@worker_thread = nil | ||
@pool.push self | ||
end | ||
|
||
# Semipublic: Forces this future to be run. | ||
def run_future | ||
@queue.push(value: @block.call) | ||
rescue ::Exception => e | ||
@queue.push(exception: e) | ||
def resolve! | ||
@mutex.synchronize do | ||
begin | ||
Thread.handle_interrupt(DeadlockError => :immediate) do | ||
@resolved_future = { value: @block.call } | ||
end | ||
rescue ::Exception => e | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid rescuing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid rescuing the |
||
@resolved_future = { exception: e } | ||
ensure | ||
@pool.done_with self | ||
@worker_thread = nil | ||
@worker_finished.broadcast | ||
end | ||
end | ||
end | ||
|
||
# Semipublic: Returns the future's value. Will wait for the future to be | ||
# Semipublic: Returns the future's value. Will wait for the future to be | ||
# completed or return its value otherwise. Can be called multiple times. | ||
# | ||
# Returns the Future's block execution result. | ||
|
@@ -61,23 +74,30 @@ def marshal_load value | |
@resolved_future = value | ||
end | ||
|
||
def resolved? | ||
instance_variable_defined? :@resolved_future | ||
end | ||
|
||
def_delegators :__getobj__, :class, :kind_of?, :is_a?, :nil? | ||
|
||
alias_method :future_value, :__getobj__ | ||
|
||
private | ||
|
||
def resolved_future_value_or_raise | ||
resolved = resolved_future_value | ||
|
||
Kernel.raise resolved[:exception] if resolved[:exception] | ||
resolved | ||
resolved_future.tap do |resolved| | ||
::Kernel.raise resolved[:exception] if resolved.has_key?(:exception) | ||
end | ||
end | ||
|
||
def resolved_future_value | ||
@resolved_future || @mutex.synchronize do | ||
@resolved_future ||= @queue.pop | ||
end | ||
def resolved_future | ||
@mutex.synchronize do | ||
unless resolved? | ||
@pool.depend self | ||
@worker_finished.wait(@mutex) | ||
end | ||
end unless resolved? | ||
@resolved_future | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,93 +1,220 @@ | ||
require 'set' | ||
require 'thread' | ||
require 'futuroscope/worker' | ||
|
||
module Futuroscope | ||
# Futuroscope's pool is design to control concurency and keep it between some | ||
# certain benefits. Moreover, we warm up the threads beforehand so we don't | ||
# have to spin them up each time a future is created. | ||
class Pool | ||
attr_reader :workers | ||
attr_accessor :min_workers, :max_workers | ||
attr_reader :workers, :min_workers | ||
attr_accessor :max_workers | ||
|
||
# Public: Initializes a new Pool. | ||
# | ||
# thread_count - The number of workers that this pool is gonna have | ||
def initialize(range = 8..16) | ||
@min_workers = range.min | ||
@max_workers = range.max | ||
@queue = Queue.new | ||
@workers = Set.new | ||
@mutex = Mutex.new | ||
@dependencies = {} | ||
@priorities = {} | ||
@future_needs_worker = ConditionVariable.new | ||
@workers = ::Set.new | ||
@mutex = ::Mutex.new | ||
warm_up_workers | ||
|
||
# We need to keep references to the futures to prevent them from being GC'd. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [82/80] |
||
# However, they can't be the keys of @priorities, because Hash will call #hash on them, which is forwarded to the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [119/80] |
||
# wrapped object, causing a deadlock. Not forwarding is not an option, because then to the outside world | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [110/80] |
||
# futures won't be transparent: hsh[:key] will not be the same as hsh[future { :key }]. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [93/80] |
||
@futures = {} | ||
end | ||
|
||
# Public: Enqueues a new Future into the pool. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
# Public: Pushes a Future into the worklist with low priority. | ||
# | ||
# future - The Future to enqueue. | ||
def queue(future) | ||
# future - The Future to push. | ||
def push(future) | ||
@mutex.synchronize do | ||
spin_worker if can_spin_extra_workers? | ||
@queue.push future | ||
Futuroscope.info "PUSH: added future #{future.__id__}" | ||
@priorities[future.__id__] = 0 | ||
@futures[future.__id__] = future | ||
spin_worker if need_extra_worker? | ||
Futuroscope.info " sending signal to wake up a thread" | ||
Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{k}", v] }.to_h}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [109/80] |
||
@future_needs_worker.signal | ||
end | ||
end | ||
|
||
# Public: Pops a new job from the pool. It will return nil if there's | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
# Public: Pops a new job from the pool. It will return nil if there's | ||
# enough workers in the pool to take care of it. | ||
# | ||
# Returns a Future | ||
def pop | ||
@mutex.synchronize { await_future(more_workers_than_needed? ? 2 : nil) } | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
# Public: Indicates that the current thread is waiting for a Future. | ||
# | ||
# future - The Future being waited for. | ||
def depend(future) | ||
@mutex.synchronize do | ||
return nil if @queue.empty? && more_workers_than_needed? | ||
Futuroscope.info "DEPEND: thread #{Thread.current.__id__} depends on future #{future.__id__}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [101/80] |
||
@dependencies[Thread.current] = future | ||
Futuroscope.debug " current dependencies: #{@dependencies.map { |k, v| ["thread #{k.__id__}", "future #{v.__id__}"] }.to_h}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [139/80] |
||
handle_deadlocks | ||
dependent_future_id = current_thread_future_id | ||
incr = 1 + (dependent_future_id.nil? ? 0 : @priorities[dependent_future_id]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [84/80] |
||
increment_priority(future, incr) | ||
end | ||
return @queue.pop | ||
end | ||
|
||
# Private: Notifies that a worker just died so it can be removed from the | ||
# pool. | ||
# | ||
# worker - A Worker | ||
def worker_died(worker) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
# Semipublic: Called by a worker to indicate that it finished resolving a future. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [85/80] |
||
def done_with(future) | ||
@mutex.synchronize do | ||
@workers.delete(worker) | ||
Futuroscope.info "DONE: thread #{Thread.current.__id__} is done with future #{future.__id__}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [103/80] |
||
Futuroscope.info " deleting future #{future.__id__} from the task list" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [86/80] |
||
@futures.delete future.__id__ | ||
@priorities.delete future.__id__ | ||
dependencies_to_delete = @dependencies.select { |dependent, dependee| dependee.__id__ == future.__id__ } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused block argument - |
||
dependencies_to_delete.each do |dependent, dependee| | ||
Futuroscope.info " deleting dependency from thread #{dependent.__id__} to future #{dependee.__id__}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [117/80] |
||
@dependencies.delete dependent | ||
end | ||
end | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def min_workers=(count) | ||
@min_workers = count | ||
warm_up_workers | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
private | ||
|
||
def warm_up_workers | ||
@mutex.synchronize do | ||
while(@workers.length < @min_workers) do | ||
while workers.length < min_workers do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never use |
||
spin_worker | ||
end | ||
end | ||
end | ||
|
||
def can_spin_extra_workers? | ||
@workers.length < @max_workers && span_chance | ||
end | ||
|
||
def span_chance | ||
[true, false].sample | ||
def finalize | ||
workers.each do |worker| | ||
workers.delete worker | ||
worker.thread.kill | ||
end | ||
end | ||
|
||
def more_workers_than_needed? | ||
@workers.length > @min_workers | ||
end | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
# The below methods should only be called with @mutex already acquired. | ||
# These are only extracted for readability purposes. | ||
|
||
|
||
def spin_worker | ||
worker = Worker.new(self) | ||
@workers << worker | ||
workers << worker | ||
worker.run | ||
Futuroscope.info " spun up worker with thread #{worker.thread.__id__}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [83/80] |
||
end | ||
|
||
def finalize | ||
@workers.each(&:stop) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def increment_priority(future, increment) | ||
return nil if NilClass === future | ||
Futuroscope.info " incrementing priority for future #{future.__id__}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [82/80] |
||
@priorities[future.__id__] += increment | ||
increment_priority(@dependencies[future.worker_thread], increment) | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def current_thread_future_id | ||
@priorities.keys.find { |id| @futures[id].worker_thread == Thread.current } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [81/80] |
||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def await_future(timeout) | ||
until @priorities.any? { |future_id, priority| @futures[future_id].worker_thread.nil? } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused block argument - |
||
Futuroscope.info "POP: thread #{Thread.current.__id__} going to sleep until there's something to do#{timeout && " or #{timeout} seconds"}..." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [152/80] |
||
@future_needs_worker.wait(@mutex, timeout) | ||
Futuroscope.info "POP: ... thread #{Thread.current.__id__} woke up" | ||
Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{k}", v] }.to_h}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [109/80] |
||
Futuroscope.debug " current future workers: #{@priorities.map { |k, v| ["future #{k}", (thread = @futures[k].worker_thread; thread.nil? ? nil : "thread #{thread.__id__}")] }.to_h}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused block argument - |
||
if more_workers_than_needed? && [email protected]? { |future_id, priority| @futures[future_id].worker_thread.nil? } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused block argument - |
||
Futuroscope.info " thread #{Thread.current.__id__} is dying because there's nothing to do and there are more threads than the minimum" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [151/80] |
||
workers.delete_if { |worker| worker.thread == Thread.current } | ||
return nil | ||
end | ||
timeout = nil | ||
end | ||
future_id = @priorities.select { |future_id, priority| @futures[future_id].worker_thread.nil? }.max_by { |future_id, priority| priority }.first | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shadowing outer local variable - |
||
Futuroscope.info "POP: thread #{Thread.current.__id__} will start working on future #{future_id}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [106/80] |
||
future = @futures[future_id] | ||
future.worker_thread = Thread.current | ||
future | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def handle_deadlocks | ||
Thread.handle_interrupt(DeadlockError => :immediate) do | ||
Thread.handle_interrupt(DeadlockError => :never) do | ||
if !(cycle = find_cycle).nil? | ||
Futuroscope.error " deadlock! cyclical dependency, sending interrupt to all threads involved" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [112/80] |
||
cycle.each { |thread| thread.raise DeadlockError, "Cyclical dependency detected, the future was aborted." } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [119/80] |
||
elsif cycleless_deadlock? | ||
thread_to_interrupt = least_priority_independent_thread | ||
Futuroscope.error " deadlock! ran out of workers, sending interrupt to thread #{thread_to_interrupt.__id__}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [127/80] |
||
thread_to_interrupt.raise DeadlockError, "Pool size is too low, the future was aborted." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [100/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [100/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [100/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [100/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [100/80] |
||
end | ||
end | ||
end | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def find_cycle | ||
chain = [Thread.current] | ||
loop do | ||
last_thread = chain.last | ||
return nil unless @dependencies.has_key?(last_thread) | ||
next_future = @dependencies[last_thread] | ||
next_thread = next_future.worker_thread | ||
return nil if next_thread.nil? | ||
return chain if next_thread == chain.first | ||
chain << next_thread | ||
end | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def cycleless_deadlock? | ||
workers.all? { |worker| @dependencies.has_key?(worker.thread) } && workers.count == max_workers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [101/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [101/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [101/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [101/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [101/80] |
||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def least_priority_independent_thread | ||
@priorities.sort_by(&:last).map(&:first).each do |future_id| | ||
its_thread = @futures[future_id].worker_thread | ||
return its_thread if !its_thread.nil? && @dependencies[its_thread].worker_thread.nil? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [93/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [93/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [93/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [93/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [93/80] |
||
end | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def need_extra_worker? | ||
workers.count < max_workers && futures_needing_worker.count > workers.count(&:free) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] |
||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def more_workers_than_needed? | ||
workers.count > min_workers && futures_needing_worker.count < workers.count(&:free) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line is too long. [89/80] |
||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra blank line detected. |
||
def futures_needing_worker | ||
@futures.values.select { |future| future.worker_thread.nil? } | ||
end | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra empty line detected at body end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra empty line detected at body end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra empty line detected at body end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra empty line detected at body end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra empty line detected at body end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra empty line detected at body end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra empty line detected at body end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra empty line detected at body end. |
||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line is too long. [99/80]