class Dynflow::Executors::Parallel::Core

Attributes

logger[R]

Public Class Methods

new(world, heartbeat_interval, queues_options) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 7
def initialize(world, heartbeat_interval, queues_options)
  @logger         = world.logger
  @world          = Type! world, World
  @queues_options = queues_options
  @pools          = {}
  @terminated     = nil
  @director       = Director.new(@world)
  @heartbeat_interval = heartbeat_interval

  initialize_queues
  schedule_heartbeat
end

Public Instance Methods

dead_letter_routing() click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 73
def dead_letter_routing
  @world.dead_letter_handler
end
execution_status(execution_plan_id = nil) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 77
def execution_status(execution_plan_id = nil)
  @pools.each_with_object({}) do |(pool_name, pool), hash|
    hash[pool_name] = pool.ask!([:execution_status, execution_plan_id])
  end
end
finish_termination(pool_name) click to toggle source
Calls superclass method Dynflow::Actor#finish_termination
# File lib/dynflow/executors/parallel/core.rb, line 64
def finish_termination(pool_name)
  @pools.delete(pool_name)
  # we expect this message from all worker pools
  return unless @pools.empty?
  @director.terminate
  logger.error '... core terminated.'
  super()
end
handle_event(event) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 39
def handle_event(event)
  Type! event, Director::Event
  if terminating?
    raise Dynflow::Error,
          "cannot accept event: #{event} core is terminating"
  end
  feed_pool(@director.handle_event(event))
end
handle_execution(execution_plan_id, finished) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 30
def handle_execution(execution_plan_id, finished)
  if terminating?
    raise Dynflow::Error,
          "cannot accept execution_plan_id:#{execution_plan_id} core is terminating"
  end

  feed_pool(@director.start_execution(execution_plan_id, finished))
end
handle_persistence_error(error) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 52
def handle_persistence_error(error)
  logger.fatal "PersistenceError in executor: terminating"
  logger.fatal error
  @world.terminate
end
heartbeat() click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 83
def heartbeat
  @logger.debug('Executor heartbeat')
  record = @world.coordinator.find_records(:id => @world.id,
                                           :class => ['Dynflow::Coordinator::ExecutorWorld', 'Dynflow::Coordinator::ClientWorld']).first
  unless record
    logger.error(%Q{Executor's world record for #{@world.id} missing: terminating})
    @world.terminate
    return
  end

  record.data[:meta].update(:last_seen => Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time)
  @world.coordinator.update_record(record)
  schedule_heartbeat
end
initialize_queues() click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 20
def initialize_queues
  default_pool_size = @queues_options[:default][:pool_size]
  @queues_options.each do |(queue_name, queue_options)|
    queue_pool_size = queue_options.fetch(:pool_size, default_pool_size)
    @pools[queue_name] = Pool.spawn("pool #{queue_name}", reference,
                                    queue_name, queue_pool_size,
                                    @world.transaction_adapter)
  end
end
start_termination(*args) click to toggle source
Calls superclass method Dynflow::Actor#start_termination
# File lib/dynflow/executors/parallel/core.rb, line 58
def start_termination(*args)
  super
  logger.info 'shutting down Core ...'
  @pools.values.each { |pool| pool.tell([:start_termination, Concurrent.future]) }
end
work_finished(work) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 48
def work_finished(work)
  feed_pool(@director.work_finished(work))
end

Private Instance Methods

fallback_queue() click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 125
def fallback_queue
  :default
end
feed_pool(work_items) click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 110
def feed_pool(work_items)
  return if terminating?
  return if work_items.nil?
  work_items = [work_items] if work_items.is_a? Director::WorkItem
  work_items.all? { |i| Type! i, Director::WorkItem }
  work_items.each do |new_work|
    pool = @pools[new_work.queue]
    unless pool
      logger.error("Pool is not available for queue #{new_work.queue}, falling back to #{fallback_queue}")
      pool = @pools[fallback_queue]
    end
    pool.tell([:schedule_work, new_work])
  end
end
on_message(message) click to toggle source
Calls superclass method Dynflow::MethodicActor#on_message
# File lib/dynflow/executors/parallel/core.rb, line 104
def on_message(message)
  super
rescue Errors::PersistenceError => e
  self.tell([:handle_persistence_error, e])
end
schedule_heartbeat() click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 100
def schedule_heartbeat
  @world.clock.ping(self, @heartbeat_interval, :heartbeat)
end