class Dynflow::DelayedExecutors::AbstractCore

Attributes

logger[R]
world[R]

Public Class Methods

new(world, options = {}) click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 8
def initialize(world, options = {})
  @world = Type! world, World
  @logger = world.logger
  configure(options)
end

Public Instance Methods

check_delayed_plans() click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 22
def check_delayed_plans
  raise NotImplementedError
end
configure(options) click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 18
def configure(options)
  @time_source = options.fetch(:time_source, -> { Time.now.utc })
end
start() click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 14
def start
  raise NotImplementedError
end

Private Instance Methods

delayed_execution_plans(time) click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 32
def delayed_execution_plans(time)
  with_error_handling([]) do
    world.persistence.find_past_delayed_plans(time)
  end
end
fix_plan_state(plan) click to toggle source

handle the case, where the process was termintated while planning was in progress before

# File lib/dynflow/delayed_executors/abstract_core.rb, line 74
def fix_plan_state(plan)
  if plan.execution_plan.state == :planning
    @logger.info("Execution plan #{plan.execution_plan_uuid} is expected to be in 'scheduled' state, was '#{plan.execution_plan.state}', auto-fixing")
    plan.execution_plan.set_state(:scheduled, true)
    plan.execution_plan.save
  end
end
process(delayed_plans, check_time) click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 46
def process(delayed_plans, check_time)
  processed_plan_uuids = []
  delayed_plans.each do |plan|
    next if plan.frozen
    fix_plan_state(plan)
    with_error_handling do
      if plan.execution_plan.state != :scheduled
        # in case the previous process was terminated after running the plan, but before deleting the delayed plan record.
        @logger.info("Execution plan #{plan.execution_plan_uuid} is expected to be in 'scheduled' state, was '#{plan.execution_plan.state}', skipping")
      elsif !plan.start_before.nil? && plan.start_before < check_time
        @logger.debug "Failing plan #{plan.execution_plan_uuid}"
        plan.timeout
      else
        @logger.debug "Executing plan #{plan.execution_plan_uuid}"
        Executors.run_user_code do
          plan.plan
          plan.execute
        end
      end
      processed_plan_uuids << plan.execution_plan_uuid
    end
  end
  world.persistence.delete_delayed_plans(:execution_plan_uuid => processed_plan_uuids) unless processed_plan_uuids.empty?
end
time() click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 28
def time
  @time_source.call()
end
with_error_handling(error_retval = nil, &block) click to toggle source
# File lib/dynflow/delayed_executors/abstract_core.rb, line 38
def with_error_handling(error_retval = nil, &block)
  block.call
rescue Exception => e
  @logger.warn e.message
  @logger.debug e.backtrace.join("\n")
  error_retval
end