class Dynflow::Executors::Abstract::Core
Attributes
logger[R]
Public Class Methods
new(world, heartbeat_interval, queues_options)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 8 def initialize(world, heartbeat_interval, queues_options) @logger = world.logger @world = Type! world, World @pools = {} @terminated = nil @director = Director.new(@world) @heartbeat_interval = heartbeat_interval @queues_options = queues_options schedule_heartbeat end
Public Instance Methods
dead_letter_routing()
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 70 def dead_letter_routing @world.dead_letter_handler end
execution_status(execution_plan_id = nil)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 74 def execution_status(execution_plan_id = nil) {} end
finish_termination()
click to toggle source
Calls superclass method
Dynflow::Actor#finish_termination
# File lib/dynflow/executors/abstract/core.rb, line 64 def finish_termination @director.terminate logger.info '... Dynflow core terminated.' super() end
handle_event(event)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 29 def handle_event(event) Type! event, Director::Event if terminating? raise Dynflow::Error, "cannot accept event: #{event} core is terminating" end handle_work(@director.handle_event(event)) end
handle_execution(execution_plan_id, finished)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 20 def handle_execution(execution_plan_id, finished) if terminating? raise Dynflow::Error, "cannot accept execution_plan_id:#{execution_plan_id} core is terminating" end handle_work(@director.start_execution(execution_plan_id, finished)) end
handle_persistence_error(error, work = nil)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 49 def handle_persistence_error(error, work = nil) logger.error "PersistenceError in executor" logger.error error @director.work_failed(work) if work if error.is_a? Errors::FatalPersistenceError logger.fatal "Terminating" @world.terminate end end
heartbeat()
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 78 def heartbeat @logger.debug('Executor heartbeat') record = @world.coordinator.find_records(:id => @world.id, :class => ['Dynflow::Coordinator::ExecutorWorld', 'Dynflow::Coordinator::ClientWorld']).first unless record logger.error(%{Executor's world record for #{@world.id} missing: terminating}) @world.terminate return end record.data[:meta].update(:last_seen => Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time) @world.coordinator.update_record(record) schedule_heartbeat end
plan_events(delayed_events)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 38 def plan_events(delayed_events) delayed_events.each do |event| @world.plan_event(event.execution_plan_id, event.step_id, event.event, event.time, optional: event.optional) end end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Actor#start_termination
# File lib/dynflow/executors/abstract/core.rb, line 59 def start_termination(*args) logger.info 'shutting down Core ...' super end
work_finished(work, delayed_events = nil)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 44 def work_finished(work, delayed_events = nil) handle_work(@director.work_finished(work)) plan_events(delayed_events) if delayed_events end
Private Instance Methods
fallback_queue()
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 104 def fallback_queue :default end
feed_pool(work_items)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 126 def feed_pool(work_items) raise NotImplementedError end
handle_work(work_items)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 118 def handle_work(work_items) return if terminating? return if work_items.nil? work_items = [work_items] if work_items.is_a? Director::WorkItem work_items.all? { |i| Type! i, Director::WorkItem } feed_pool(work_items) end
on_message(message)
click to toggle source
Calls superclass method
Dynflow::MethodicActor#on_message
# File lib/dynflow/executors/abstract/core.rb, line 112 def on_message(message) super rescue Errors::PersistenceError => e handle_persistence_error(e) end
schedule_heartbeat()
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 108 def schedule_heartbeat @world.clock.ping(self, @heartbeat_interval, :heartbeat) end
suggest_queue(work_item)
click to toggle source
# File lib/dynflow/executors/abstract/core.rb, line 95 def suggest_queue(work_item) queue = work_item.queue unless @queues_options.key?(queue) logger.debug("Pool is not available for queue #{queue}, falling back to #{fallback_queue}") queue = fallback_queue end queue end