class Dynflow::DelayedExecutors::AbstractCore
Attributes
logger[R]
world[R]
Public Class Methods
new(world, options = {})
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 9 def initialize(world, options = {}) @world = Type! world, World @logger = world.logger configure(options) end
Public Instance Methods
check_delayed_plans()
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 23 def check_delayed_plans raise NotImplementedError end
configure(options)
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 19 def configure(options) @time_source = options.fetch(:time_source, -> { Time.now.utc }) end
start()
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 15 def start raise NotImplementedError end
Private Instance Methods
delayed_execution_plans(time)
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 33 def delayed_execution_plans(time) with_error_handling([]) do world.persistence.find_past_delayed_plans(time) end end
fix_plan_state(plan)
click to toggle source
handle the case, where the process was termintated while planning was in progress before TODO: Doing execution plan updates in orchestrator is bad
# File lib/dynflow/delayed_executors/abstract_core.rb, line 73 def fix_plan_state(plan) if plan.execution_plan.state == :planning @logger.info("Execution plan #{plan.execution_plan_uuid} is expected to be in 'scheduled' state, was '#{plan.execution_plan.state}', auto-fixing") plan.execution_plan.set_state(:scheduled, true) plan.execution_plan.save end end
locked_for_planning?(planning_locks, plan)
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 81 def locked_for_planning?(planning_locks, plan) planning_locks.any? { |lock| lock.execution_plan_id == plan.execution_plan_uuid } end
process(delayed_plans, check_time)
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 47 def process(delayed_plans, check_time) processed_plan_uuids = [] dispatched_plan_uuids = [] planning_locks = world.coordinator.find_records(class: Coordinator::PlanningLock.name) delayed_plans.each do |plan| next if plan.frozen || locked_for_planning?(planning_locks, plan) fix_plan_state(plan) with_error_handling do if plan.execution_plan.state != :scheduled # in case the previous process was terminated after running the plan, but before deleting the delayed plan record. @logger.info("Execution plan #{plan.execution_plan_uuid} is expected to be in 'scheduled' state, was '#{plan.execution_plan.state}', skipping") processed_plan_uuids << plan.execution_plan_uuid else @logger.debug "Executing plan #{plan.execution_plan_uuid}" world.plan_request(plan.execution_plan_uuid) dispatched_plan_uuids << plan.execution_plan_uuid end end end world.persistence.delete_delayed_plans(:execution_plan_uuid => processed_plan_uuids) unless processed_plan_uuids.empty? end
time()
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 29 def time @time_source.call() end
with_error_handling(error_retval = nil, &block)
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 39 def with_error_handling(error_retval = nil, &block) block.call rescue Exception => e @logger.warn e.message @logger.debug e.backtrace.join("\n") error_retval end