Director is responsible for telling what to do next when:
* new execution starts * an event accurs * some work is finished
It's public methods (except terminate) return work items that the executor should understand
# File lib/dynflow/director.rb, line 79 def initialize(world) @world = world @logger = world.logger @execution_plan_managers = {} @plan_ids_in_rescue = Set.new end
# File lib/dynflow/director.rb, line 92 def handle_event(event) Type! event, Event execution_plan_manager = @execution_plan_managers[event.execution_plan_id] if execution_plan_manager execution_plan_manager.event(event) else raise Dynflow::Error, "no manager for #{event.inspect}" end rescue Dynflow::Error => e event.result.fail e.message raise e end
# File lib/dynflow/director.rb, line 86 def start_execution(execution_plan_id, finished) manager = track_execution_plan(execution_plan_id, finished) return [] unless manager unless_done(manager, manager.start) end
# File lib/dynflow/director.rb, line 110 def terminate unless @execution_plan_managers.empty? logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..." begin @execution_plan_managers.values.each do |manager| manager.terminate end rescue Errors::PersistenceError logger.error "could not to clean the data properly" end @execution_plan_managers.values.each do |manager| finish_manager(manager) end end end
# File lib/dynflow/director.rb, line 105 def work_finished(work) manager = @execution_plan_managers[work.execution_plan_id] unless_done(manager, manager.what_is_next(work)) end
# File lib/dynflow/director.rb, line 138 def finish_manager(manager) @execution_plan_managers.delete(manager.execution_plan.id) if rescue?(manager) rescue!(manager) else set_future(manager) end end
# File lib/dynflow/director.rb, line 153 def rescue!(manager) # TODO: after moving to concurrent-ruby actors, there should be better place # to put this logic of making sure we don't run rescues in endless loop @plan_ids_in_rescue << manager.execution_plan.id rescue_plan_id = manager.execution_plan.rescue_plan_id if rescue_plan_id @world.executor.execute(rescue_plan_id, manager.future, false) else set_future(manager) end end
# File lib/dynflow/director.rb, line 147 def rescue?(manager) return false if @world.terminating? @world.auto_rescue && manager.execution_plan.state == :paused && !@plan_ids_in_rescue.include?(manager.execution_plan.id) end
# File lib/dynflow/director.rb, line 185 def set_future(manager) @plan_ids_in_rescue.delete(manager.execution_plan.id) manager.future.success manager.execution_plan end
# File lib/dynflow/director.rb, line 165 def track_execution_plan(execution_plan_id, finished) execution_plan = @world.persistence.load_execution_plan(execution_plan_id) if @execution_plan_managers[execution_plan_id] raise Dynflow::Error, "cannot execute execution_plan_id:#{execution_plan_id} it's already running" end if execution_plan.state == :stopped raise Dynflow::Error, "cannot execute execution_plan_id:#{execution_plan_id} it's stopped" end @execution_plan_managers[execution_plan_id] = ExecutionPlanManager.new(@world, execution_plan, finished) rescue Dynflow::Error => e finished.fail e nil end
# File lib/dynflow/director.rb, line 128 def unless_done(manager, work_items) return [] unless manager if manager.done? finish_manager(manager) return [] else return work_items end end