class Dynflow::Executors::Parallel::Pool
Public Class Methods
new(world, core, name, pool_size, transaction_adapter)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 53 def initialize(world, core, name, pool_size, transaction_adapter) @world = world @name = name @executor_core = core @pool_size = pool_size @jobs = JobStorage.new @free_workers = Array.new(pool_size) do |i| name = "worker-#{i}" Worker.spawn(name, reference, transaction_adapter, telemetry_options.merge(:worker => name)) end end
Public Instance Methods
execution_status(execution_plan_id = nil)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 89 def execution_status(execution_plan_id = nil) { :pool_size => @pool_size, :free_workers => @free_workers.count, :execution_status => @jobs.execution_status(execution_plan_id) } end
handle_persistence_error(worker, error, work = nil)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 78 def handle_persistence_error(worker, error, work = nil) @executor_core.tell([:handle_persistence_error, error, work]) @free_workers << worker distribute_jobs end
schedule_work(work)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 65 def schedule_work(work) @jobs.add work distribute_jobs update_telemetry end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Actor#start_termination
# File lib/dynflow/executors/parallel/pool.rb, line 84 def start_termination(*args) super try_to_terminate end
worker_done(worker, work)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 71 def worker_done(worker, work) @executor_core.tell([:work_finished, work]) @free_workers << worker Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, -1, telemetry_options) } distribute_jobs end
Private Instance Methods
distribute_jobs()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 105 def distribute_jobs try_to_terminate until @free_workers.empty? || @jobs.empty? Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, '+1', telemetry_options) } @free_workers.pop << @jobs.pop update_telemetry end end
telemetry_options()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 114 def telemetry_options { :queue => @name.to_s, :world => @world.id } end
try_to_terminate()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 97 def try_to_terminate if terminating? && @free_workers.size == @pool_size @free_workers.map { |worker| worker.ask(:terminate!) }.map(&:wait) @executor_core.tell([:finish_termination, @name]) finish_termination end end
update_telemetry()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 118 def update_telemetry Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_queue_size, @jobs.queue_size, telemetry_options) } end