class Dynflow::Executors::Parallel::Pool

Public Class Methods

new(world, core, name, pool_size, transaction_adapter) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 37
def initialize(world, core, name, pool_size, transaction_adapter)
  @world = world
  @name = name
  @executor_core = core
  @pool_size     = pool_size
  @jobs          = JobStorage.new
  @free_workers  = Array.new(pool_size) do |i|
    name = "worker-#{i}"
    Worker.spawn(name, reference, transaction_adapter, telemetry_options.merge(:worker => name))
  end
end

Public Instance Methods

execution_status(execution_plan_id = nil) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 73
def execution_status(execution_plan_id = nil)
  { :pool_size => @pool_size,
    :free_workers => @free_workers.count,
    :queue_size => @jobs.queue_size(execution_plan_id) }
end
handle_persistence_error(worker, error, work = nil) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 62
def handle_persistence_error(worker, error, work = nil)
  @executor_core.tell([:handle_persistence_error, error, work])
  @free_workers << worker
  distribute_jobs
end
schedule_work(work) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 49
def schedule_work(work)
  @jobs.add work
  distribute_jobs
  update_telemetry
end
start_termination(*args) click to toggle source
Calls superclass method Dynflow::Actor#start_termination
# File lib/dynflow/executors/parallel/pool.rb, line 68
def start_termination(*args)
  super
  try_to_terminate
end
worker_done(worker, work) click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 55
def worker_done(worker, work)
  @executor_core.tell([:work_finished, work])
  @free_workers << worker
  Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, -1, telemetry_options) }
  distribute_jobs
end

Private Instance Methods

distribute_jobs() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 93
def distribute_jobs
  try_to_terminate
  until @free_workers.empty? || @jobs.empty?
    Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, '+1', telemetry_options) }
    @free_workers.pop << @jobs.pop
    update_telemetry
  end
end
telemetry_options() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 102
def telemetry_options
  { :queue => @name.to_s, :world => @world.id }
end
try_to_terminate() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 81
def try_to_terminate
  if terminating?
    @free_workers.map { |worker| worker.ask(:terminate!) }.map(&:wait)
    @pool_size -= @free_workers.count
    @free_workers = []
    if @pool_size.zero?
      @executor_core.tell([:finish_termination, @name])
      finish_termination
    end
  end
end
update_telemetry() click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 106
def update_telemetry
  Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_queue_size, @jobs.queue_size, telemetry_options) }
end