class Dynflow::Executors::Parallel::Pool

Public Class Methods

new(world, core, name, pool_size, transaction_adapter) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 53
def initialize(world, core, name, pool_size, transaction_adapter)
  @world = world
  @name = name
  @executor_core = core
  @pool_size     = pool_size
  @jobs          = JobStorage.new
  @free_workers  = Array.new(pool_size) do |i|
    name = "worker-#{i}"
    Worker.spawn(name, reference, transaction_adapter, telemetry_options.merge(:worker => name))
  end
end

Public Instance Methods

execution_status(execution_plan_id = nil) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 89
def execution_status(execution_plan_id = nil)
  { :pool_size => @pool_size,
    :free_workers => @free_workers.count,
    :execution_status => @jobs.execution_status(execution_plan_id) }
end
handle_persistence_error(worker, error, work = nil) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 78
def handle_persistence_error(worker, error, work = nil)
  @executor_core.tell([:handle_persistence_error, error, work])
  @free_workers << worker
  distribute_jobs
end
schedule_work(work) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 65
def schedule_work(work)
  @jobs.add work
  distribute_jobs
  update_telemetry
end
start_termination(*args) click to toggle source
Calls superclass method Dynflow::Actor#start_termination
# File lib/dynflow/executors/parallel/pool.rb, line 84
def start_termination(*args)
  super
  try_to_terminate
end
worker_done(worker, work) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 71
def worker_done(worker, work)
  @executor_core.tell([:work_finished, work])
  @free_workers << worker
  Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, -1, telemetry_options) }
  distribute_jobs
end

Private Instance Methods

distribute_jobs() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 105
def distribute_jobs
  try_to_terminate
  until @free_workers.empty? || @jobs.empty?
    Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, '+1', telemetry_options) }
    @free_workers.pop << @jobs.pop
    update_telemetry
  end
end
telemetry_options() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 114
def telemetry_options
  { :queue => @name.to_s, :world => @world.id }
end
try_to_terminate() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 97
def try_to_terminate
  if terminating? && @free_workers.size == @pool_size
    @free_workers.map { |worker| worker.ask(:terminate!) }.map(&:wait)
    @executor_core.tell([:finish_termination, @name])
    finish_termination
  end
end
update_telemetry() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 118
def update_telemetry
  Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_queue_size, @jobs.queue_size, telemetry_options) }
end