class Dynflow::Executors::Parallel::Pool
Public Class Methods
new(core, pool_size, transaction_adapter)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 49 def initialize(core, pool_size, transaction_adapter) @executor_core = core @pool_size = pool_size @free_workers = Array.new(pool_size) { |i| Worker.spawn("worker-#{i}", reference, transaction_adapter) } @jobs = JobStorage.new end
Public Instance Methods
execution_status(execution_plan_id = nil)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 76 def execution_status(execution_plan_id = nil) { :pool_size => @pool_size, :free_workers => @free_workers.count, :execution_status => @jobs.execution_status(execution_plan_id) } end
handle_persistence_error(error)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 67 def handle_persistence_error(error) @executor_core.tell([:handle_persistence_error, error]) end
schedule_work(work)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 56 def schedule_work(work) @jobs.add work distribute_jobs end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Actor#start_termination
# File lib/dynflow/executors/parallel/pool.rb, line 71 def start_termination(*args) super try_to_terminate end
worker_done(worker, work)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 61 def worker_done(worker, work) @executor_core.tell([:work_finished, work]) @free_workers << worker distribute_jobs end
Private Instance Methods
distribute_jobs()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 92 def distribute_jobs try_to_terminate @free_workers.pop << @jobs.pop until @free_workers.empty? || @jobs.empty? end
try_to_terminate()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 84 def try_to_terminate if terminating? && @free_workers.size == @pool_size @free_workers.map { |worker| worker.ask(:terminate!) }.map(&:wait) @executor_core.tell(:finish_termination) finish_termination end end