class Dynflow::DelayedExecutors::AbstractCore
Attributes
logger[R]
world[R]
Public Class Methods
new(world, options = {})
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 9 def initialize(world, options = {}) @world = Type! world, World @logger = world.logger configure(options) end
Public Instance Methods
check_delayed_plans()
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 23 def check_delayed_plans raise NotImplementedError end
configure(options)
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 19 def configure(options) @time_source = options.fetch(:time_source, -> { Time.now.utc }) end
start()
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 15 def start raise NotImplementedError end
Private Instance Methods
delayed_execution_plans(time)
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 33 def delayed_execution_plans(time) with_error_handling([]) do world.persistence.find_past_delayed_plans(time) end end
fix_plan_state(plan)
click to toggle source
handle the case, where the process was termintated while planning was in progress before
# File lib/dynflow/delayed_executors/abstract_core.rb, line 75 def fix_plan_state(plan) if plan.execution_plan.state == :planning @logger.info("Execution plan #{plan.execution_plan_uuid} is expected to be in 'scheduled' state, was '#{plan.execution_plan.state}', auto-fixing") plan.execution_plan.set_state(:scheduled, true) plan.execution_plan.save end end
process(delayed_plans, check_time)
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 47 def process(delayed_plans, check_time) processed_plan_uuids = [] delayed_plans.each do |plan| next if plan.frozen fix_plan_state(plan) with_error_handling do if plan.execution_plan.state != :scheduled # in case the previous process was terminated after running the plan, but before deleting the delayed plan record. @logger.info("Execution plan #{plan.execution_plan_uuid} is expected to be in 'scheduled' state, was '#{plan.execution_plan.state}', skipping") elsif !plan.start_before.nil? && plan.start_before < check_time @logger.debug "Failing plan #{plan.execution_plan_uuid}" plan.timeout else @logger.debug "Executing plan #{plan.execution_plan_uuid}" Executors.run_user_code do plan.plan plan.execute end end processed_plan_uuids << plan.execution_plan_uuid end end world.persistence.delete_delayed_plans(:execution_plan_uuid => processed_plan_uuids) unless processed_plan_uuids.empty? end
time()
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 29 def time @time_source.call() end
with_error_handling(error_retval = nil, &block)
click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 39 def with_error_handling(error_retval = nil, &block) block.call rescue Exception => e @logger.warn e.message @logger.debug e.backtrace.join("\n") error_retval end