class Dynflow::Director

Director is responsible for telling what to do next when:

* new execution starts
* an event accurs
* some work is finished

It's public methods (except terminate) return work items that the executor should understand

Constants

Event
UnprocessableEvent

Attributes

logger[R]

Public Class Methods

new(world) click to toggle source
# File lib/dynflow/director.rb, line 80
def initialize(world)
  @world = world
  @logger = world.logger
  @execution_plan_managers = {}
  @rescued_steps = {}
end

Public Instance Methods

handle_event(event) click to toggle source
# File lib/dynflow/director.rb, line 93
def handle_event(event)
  Type! event, Event
  execution_plan_manager = @execution_plan_managers[event.execution_plan_id]
  if execution_plan_manager
    execution_plan_manager.event(event)
  else
    raise Dynflow::Error, "no manager for #{event.inspect}"
  end
rescue Dynflow::Error => e
  event.result.fail e.message
  raise e
end
start_execution(execution_plan_id, finished) click to toggle source
# File lib/dynflow/director.rb, line 87
def start_execution(execution_plan_id, finished)
  manager = track_execution_plan(execution_plan_id, finished)
  return [] unless manager
  unless_done(manager, manager.start)
end
terminate() click to toggle source
# File lib/dynflow/director.rb, line 111
def terminate
  unless @execution_plan_managers.empty?
    logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..."
    begin
      @execution_plan_managers.values.each do |manager|
        manager.terminate
      end
    rescue Errors::PersistenceError
      logger.error "could not to clean the data properly"
    end
    @execution_plan_managers.values.each do |manager|
      finish_manager(manager)
    end
  end
end
work_finished(work) click to toggle source
# File lib/dynflow/director.rb, line 106
def work_finished(work)
  manager = @execution_plan_managers[work.execution_plan_id]
  unless_done(manager, manager.what_is_next(work))
end

Private Instance Methods

finish_manager(manager) click to toggle source
# File lib/dynflow/director.rb, line 139
def finish_manager(manager)
  @execution_plan_managers.delete(manager.execution_plan.id)
  if rescue?(manager)
    rescue!(manager)
  else
    set_future(manager)
  end
end
rescue!(manager) click to toggle source
# File lib/dynflow/director.rb, line 162
def rescue!(manager)
  # TODO: after moving to concurrent-ruby actors, there should be better place
  # to put this logic of making sure we don't run rescues in endless loop
  @rescued_steps[manager.execution_plan.id] ||= Set.new
  @rescued_steps[manager.execution_plan.id].merge(manager.execution_plan.failed_steps.map(&:id))
  rescue_plan_id = manager.execution_plan.rescue_plan_id
  if rescue_plan_id
    @world.executor.execute(rescue_plan_id, manager.future, false)
  else
    set_future(manager)
  end
end
rescue?(manager) click to toggle source
# File lib/dynflow/director.rb, line 148
def rescue?(manager)
  if @world.terminating? || !(@world.auto_rescue && manager.execution_plan.state == :paused)
    false
  elsif !@rescued_steps.key?(manager.execution_plan.id)
    # we have not rescued this plan yet
    true
  else
    # we have rescued this plan already, but a different step has failed now
    # we do this check to prevent endless loop, if we always failed on the same steps
    failed_step_ids = manager.execution_plan.failed_steps.map(&:id).to_set
    (failed_step_ids - @rescued_steps[manager.execution_plan.id]).any?
  end
end
set_future(manager) click to toggle source
# File lib/dynflow/director.rb, line 195
def set_future(manager)
  @rescued_steps.delete(manager.execution_plan.id)
  manager.future.success manager.execution_plan
end
track_execution_plan(execution_plan_id, finished) click to toggle source
# File lib/dynflow/director.rb, line 175
def track_execution_plan(execution_plan_id, finished)
  execution_plan = @world.persistence.load_execution_plan(execution_plan_id)

  if @execution_plan_managers[execution_plan_id]
    raise Dynflow::Error,
          "cannot execute execution_plan_id:#{execution_plan_id} it's already running"
  end

  if execution_plan.state == :stopped
    raise Dynflow::Error,
          "cannot execute execution_plan_id:#{execution_plan_id} it's stopped"
  end

  @execution_plan_managers[execution_plan_id] =
      ExecutionPlanManager.new(@world, execution_plan, finished)
rescue Dynflow::Error => e
  finished.fail e
  nil
end
unless_done(manager, work_items) click to toggle source
# File lib/dynflow/director.rb, line 129
def unless_done(manager, work_items)
  return [] unless manager
  if manager.done?
    finish_manager(manager)
    return []
  else
    return work_items
  end
end