class Dynflow::ExecutionPlan
rubocop:disable Metrics/ClassLength TODO extract planning logic to an extra class ExecutionPlanner
Attributes
Public Class Methods
# File lib/dynflow/execution_plan.rb, line 426 def self.load_flow(flow_hash) if flow_hash.is_a? Hash Flows::Abstract.from_hash(flow_hash) else Flows::Abstract.decode(flow_hash) end end
all params with default values are part of private api
# File lib/dynflow/execution_plan.rb, line 72 def initialize(world, id = nil, label = nil, state = :pending, root_plan_step = nil, run_flow = Flows::Concurrence.new([]), finalize_flow = Flows::Sequence.new([]), steps = {}, started_at = nil, ended_at = nil, execution_time = nil, real_time = 0.0, execution_history = ExecutionHistory.new) id ||= SecureRandom.uuid @id = Type! id, String @world = Type! world, World @label = Type! label, String, NilClass self.state = state @run_flow = Type! run_flow, Flows::Abstract @finalize_flow = Type! finalize_flow, Flows::Abstract @root_plan_step = root_plan_step @started_at = Type! started_at, Time, NilClass @ended_at = Type! ended_at, Time, NilClass @execution_time = Type! execution_time, Numeric, NilClass @real_time = Type! real_time, Numeric @execution_history = Type! execution_history, ExecutionHistory steps.all? do |k, v| Type! k, Integer Type! v, Steps::Abstract end @steps = steps end
# File lib/dynflow/execution_plan.rb, line 455 def self.new_from_hash(hash, world) check_class_matching hash execution_plan_id = hash[:id] steps = steps_from_hash(hash[:step_ids], execution_plan_id, world) self.new(world, execution_plan_id, hash[:label], hash[:state], steps[hash[:root_plan_step_id]], load_flow(hash[:run_flow]), load_flow(hash[:finalize_flow]), steps, string_to_time(hash[:started_at]), string_to_time(hash[:ended_at]), hash[:execution_time].to_f, hash[:real_time].to_f, ExecutionHistory.new_from_hash(hash[:execution_history])) rescue => plan_exception begin world.logger.error("Could not load execution plan #{execution_plan_id}") world.logger.error(plan_exception) InvalidPlan.new(plan_exception, execution_plan_id, hash[:label], hash[:state], string_to_time(hash[:started_at]), string_to_time(hash[:ended_at]), hash[:execution_time].to_f, hash[:real_time].to_f, ExecutionHistory.new_from_hash(hash[:execution_history])) rescue => invalid_plan_exception world.logger.error("Could not even load a fallback execution plan for #{execution_plan_id}") world.logger.error(invalid_plan_exception) InvalidPlan.new(invalid_plan_exception, execution_plan_id, hash[:label], hash[:state]) end end
# File lib/dynflow/execution_plan.rb, line 57 def self.results @results ||= [:pending, :success, :warning, :error, :cancelled] end
# File lib/dynflow/execution_plan.rb, line 61 def self.state_transitions @state_transitions ||= { pending: [:stopped, :scheduled, :planning], scheduled: [:planning, :stopped], planning: [:planned, :stopped], planned: [:running, :stopped], running: [:paused, :stopped], paused: [:running, :stopped], stopped: [] } end
# File lib/dynflow/execution_plan.rb, line 51 def self.states @states ||= [:pending, :scheduled, :planning, :planned, :running, :paused, :stopped] end
Private Class Methods
# File lib/dynflow/execution_plan.rb, line 545 def self.steps_from_hash(step_ids, execution_plan_id, world) steps = world.persistence.load_steps(execution_plan_id, world) ids_to_steps = steps.inject({}) do |hash, step| hash[step.id.to_i] = step hash end # to make sure to we preserve the order of the steps step_ids.inject({}) do |hash, step_id| step = ids_to_steps[step_id.to_i] if step.nil? raise Errors::DataConsistencyError, "Could not load steps for execution plan #{execution_plan_id}" else hash[step_id.to_i] = step end hash end end
Public Instance Methods
@return [Array<Action>] actions in Present phase
# File lib/dynflow/execution_plan.rb, line 517 def actions @actions ||= begin [entry_action] + entry_action.all_planned_actions end end
# File lib/dynflow/execution_plan.rb, line 419 def add_finalize_step(action) add_step(Steps::FinalizeStep, action.class, action.id).tap do |step| step.update_from_action(action) finalize_flow << Flows::Atom.new(step.id) end end
# File lib/dynflow/execution_plan.rb, line 401 def add_plan_step(action_class, caller_action = nil) add_step(Steps::PlanStep, action_class, generate_action_id).tap do |step| # TODO: to be removed and preferred by the caller_action if caller_action && caller_action.execution_plan_id == self.id @steps[caller_action.plan_step_id].children << step.id end step.initialize_action(caller_action) end end
# File lib/dynflow/execution_plan.rb, line 411 def add_run_step(action) add_step(Steps::RunStep, action.class, action.id).tap do |step| step.update_from_action(action) @dependency_graph.add_dependencies(step, action) current_run_flow.add_and_resolve(@dependency_graph, Flows::Atom.new(step.id)) end end
# File lib/dynflow/execution_plan.rb, line 395 def add_scheduling_step(action_class, caller_action = nil) add_step(Steps::PlanStep, action_class, generate_action_id, :scheduling).tap do |step| step.initialize_action(caller_action) end end
# File lib/dynflow/execution_plan.rb, line 523 def caller_execution_plan_id entry_action.caller_execution_plan_id end
sends the cancel event to all currently running and cancellable steps. if the plan is just scheduled, it cancels it (and returns an one-item array with the future value of the cancel result)
# File lib/dynflow/execution_plan.rb, line 311 def cancel(force = false) if state == :scheduled [Concurrent::Promises.resolvable_future.tap { |f| f.fulfill delay_record.cancel }] else event = force ? ::Dynflow::Action::Cancellable::Abort : ::Dynflow::Action::Cancellable::Cancel steps_to_cancel.map do |step| world.event(id, step.id, event) end end end
# File lib/dynflow/execution_plan.rb, line 322 def cancellable? return true if state == :scheduled return false unless state == :running steps_to_cancel.any? end
# File lib/dynflow/execution_plan.rb, line 493 def compute_execution_time self.steps.values.reduce(0) do |execution_time, step| execution_time + (step.execution_time || 0) end end
# File lib/dynflow/execution_plan.rb, line 371 def current_run_flow @run_flow_stack.last end
# File lib/dynflow/execution_plan.rb, line 256 def delay(caller_action, action_class, delay_options, *args) save @root_plan_step = add_scheduling_step(action_class, caller_action) run_hooks(:pending) serializer = root_plan_step.delay(delay_options, args) delayed_plan = DelayedPlan.new(@world, id, delay_options[:start_at], delay_options.fetch(:start_before, nil), serializer, delay_options[:frozen] || false) persistence.save_delayed_plan(delayed_plan) ensure update_state(error? ? :stopped : :scheduled) end
# File lib/dynflow/execution_plan.rb, line 272 def delay_record @delay_record ||= persistence.load_delayed_plan(id) end
# File lib/dynflow/execution_plan.rb, line 512 def entry_action @entry_action ||= root_plan_step.action(self) end
# File lib/dynflow/execution_plan.rb, line 183 def error? result == :error end
# File lib/dynflow/execution_plan.rb, line 191 def error_in_plan? steps_in_state(:error).any? { |step| step.is_a? Steps::PlanStep } end
# File lib/dynflow/execution_plan.rb, line 195 def errors steps.values.map(&:error).compact end
# File lib/dynflow/execution_plan.rb, line 238 def failed_steps steps_in_state(:error) end
# File lib/dynflow/execution_plan.rb, line 187 def failure? [:error, :warning, :cancelled].include?(result) end
# File lib/dynflow/execution_plan.rb, line 234 def finalize_steps steps_of_type(Dynflow::ExecutionPlan::Steps::FinalizeStep) end
# File lib/dynflow/execution_plan.rb, line 246 def generate_action_id @last_action_id ||= 0 @last_action_id += 1 end
# File lib/dynflow/execution_plan.rb, line 251 def generate_step_id @last_step_id ||= 0 @last_step_id += 1 end
# File lib/dynflow/execution_plan.rb, line 110 def logger @world.logger end
# File lib/dynflow/execution_plan.rb, line 287 def plan(*args) update_state(:planning) world.middleware.execute(:plan_phase, root_plan_step.action_class, self) do with_planning_scope do root_action = root_plan_step.execute(self, nil, false, *args) @label = root_action.label if @dependency_graph.unresolved? raise "Some dependencies were not resolved: #{@dependency_graph.inspect}" end end end if @run_flow.size == 1 @run_flow = @run_flow.sub_flows.first end steps.values.each(&:save) update_state(error? ? :stopped : :planned) end
# File lib/dynflow/execution_plan.rb, line 226 def plan_steps steps_of_type(Dynflow::ExecutionPlan::Steps::PlanStep) end
# File lib/dynflow/execution_plan.rb, line 276 def prepare(action_class, options = {}) options = options.dup caller_action = Type! options.delete(:caller_action), Dynflow::Action, NilClass raise "Unexpected options #{options.keys.inspect}" unless options.empty? save @root_plan_step = add_plan_step(action_class, caller_action) step = @root_plan_step.save run_hooks(:pending) step end
# File lib/dynflow/execution_plan.rb, line 212 def prepare_for_rescue case rescue_strategy when Action::Rescue::Pause :paused when Action::Rescue::Fail :stopped when Action::Rescue::Skip failed_steps.each { |step| self.skip(step) } :running else :paused end end
@return [0..1] the percentage of the progress. See Action::Progress
for more info
# File lib/dynflow/execution_plan.rb, line 501 def progress return 0 if [:pending, :planning, :scheduled].include?(state) flow_step_ids = run_flow.all_step_ids + finalize_flow.all_step_ids plan_done, plan_total = flow_step_ids.reduce([0.0, 0]) do |(done, total), step_id| step = self.steps[step_id] [done + (step.progress_done * step.progress_weight), total + step.progress_weight] end plan_total > 0 ? (plan_done / plan_total) : 1 end
# File lib/dynflow/execution_plan.rb, line 199 def rescue_strategy rescue_strategy = entry_action.rescue_strategy || Action::Rescue::Skip Type! rescue_strategy, Action::Rescue::Strategy end
# File lib/dynflow/execution_plan.rb, line 168 def result all_steps = steps.values if all_steps.any? { |step| step.state == :cancelled } return :cancelled elsif all_steps.any? { |step| step.state == :error } return :error elsif all_steps.any? { |step| [:skipping, :skipped].include?(step.state) } return :warning elsif all_steps.all? { |step| step.state == :success } return :success else return :pending end end
# File lib/dynflow/execution_plan.rb, line 153 def run_hooks(state) records = persistence.load_actions_attributes(@id, [:id, :class]).select do |action| Utils.constantize(action[:class]) .execution_plan_hooks .on(state).any? end action_ids = records.compact.map { |record| record[:id] } return if action_ids.empty? persistence.load_actions(self, action_ids).each do |action| world.middleware.execute(:hook, action, self) do action.class.execution_plan_hooks.run(self, action, state) end end end
# File lib/dynflow/execution_plan.rb, line 230 def run_steps steps_of_type(Dynflow::ExecutionPlan::Steps::RunStep) end
# File lib/dynflow/execution_plan.rb, line 451 def save persistence.save_execution_plan(self) end
# File lib/dynflow/execution_plan.rb, line 334 def skip(step) steps_to_skip = steps_to_skip(step).each(&:mark_to_skip) self.save return steps_to_skip end
# File lib/dynflow/execution_plan.rb, line 242 def steps_in_state(*states) self.steps.values.find_all { |step| states.include?(step.state) } end
@api private
# File lib/dynflow/execution_plan.rb, line 367 def steps_of_type(type) steps.values.find_all { |step| step.is_a?(type) } end
# File lib/dynflow/execution_plan.rb, line 328 def steps_to_cancel steps_in_state(:running, :suspended).find_all do |step| step.action(self).is_a?(::Dynflow::Action::Cancellable) end end
All the steps that need to get skipped when wanting to skip the step includes the step itself, all steps dependent on it (even transitively) FIND maybe move to persistence to let adapter to do it effectively? @return [Array<Steps::Abstract>]
# File lib/dynflow/execution_plan.rb, line 344 def steps_to_skip(step) dependent_steps = steps.values.find_all do |s| next if s.is_a? Steps::PlanStep action = persistence.load_action(s) action.required_step_ids.include?(step.id) end steps_to_skip = dependent_steps.map do |dependent_step| steps_to_skip(dependent_step) end.flatten steps_to_skip << step if step.is_a? Steps::RunStep finalize_step_id = persistence.load_action(step).finalize_step_id steps_to_skip << steps[finalize_step_id] if finalize_step_id end return steps_to_skip.uniq end
# File lib/dynflow/execution_plan.rb, line 204 def sub_plans persistence.find_execution_plans(filters: { 'caller_execution_plan_id' => self.id }) end
# File lib/dynflow/execution_plan.rb, line 208 def sub_plans_count persistence.find_execution_plan_counts(filters: { 'caller_execution_plan_id' => self.id }) end
@api private Switches the flow type (Sequence, Concurrence) to be used within the block.
# File lib/dynflow/execution_plan.rb, line 387 def switch_flow(new_flow, &block) @run_flow_stack << new_flow return block.call ensure @run_flow_stack.pop current_run_flow.add_and_resolve(@dependency_graph, new_flow) if current_run_flow end
# File lib/dynflow/execution_plan.rb, line 434 def to_hash recursive_to_hash id: id, class: self.class.to_s, label: label, state: state, result: result, root_plan_step_id: root_plan_step && root_plan_step.id, run_flow: run_flow.encode, finalize_flow: finalize_flow.encode, step_ids: steps.map { |id, _| id }, started_at: time_to_str(started_at), ended_at: time_to_str(ended_at), execution_time: execution_time, real_time: real_time, execution_history: execution_history.to_hash end
@param state [Symbol] representing the new state @param history_notice [Symbol|string|false] should a note to execution_history
be added as well?
Possible values: - :auto (default) - the history notice will be added based on the new state - string - custom history notice is added - false - don't add any notice
# File lib/dynflow/execution_plan.rb, line 120 def update_state(state, history_notice: :auto) hooks_to_run = [state] original = self.state case self.state = state when :planning @started_at = Time.now.utc when :stopped @ended_at = Time.now.utc @real_time = @ended_at - @started_at unless @started_at.nil? @execution_time = compute_execution_time key = failure? ? :failure : :success Dynflow::Telemetry.with_instance do |t| t.increment_counter(:dynflow_finished_execution_plans, 1, telemetry_common_options.merge(:result => key.to_s)) end hooks_to_run << key world.persistence.delete_delayed_plans(:execution_plan_uuid => id) if delay_record && original == :scheduled unlock_all_singleton_locks! unlock_execution_inhibition_lock! when :paused unlock_all_singleton_locks! else # ignore end logger.debug format('%13s %s %9s >> %9s', 'ExecutionPlan', id, original, state) add_history_notice(history_notice) self.save toggle_telemetry_state original == :pending ? nil : original.to_s, self.state == :stopped ? nil : self.state.to_s hooks_to_run.each { |kind| run_hooks kind } end
# File lib/dynflow/execution_plan.rb, line 106 def valid? true end
@api private
# File lib/dynflow/execution_plan.rb, line 376 def with_planning_scope(&block) @run_flow_stack = [] @dependency_graph = DependencyGraph.new switch_flow(run_flow, &block) ensure @run_flow_stack = nil @dependency_graph = nil end
Private Instance Methods
# File lib/dynflow/execution_plan.rb, line 594 def add_history_notice(history_notice) if history_notice == :auto history_notice = case state when :running 'start execution' when :paused 'pause execution' when :stopped 'finish execution' when :scheduled 'delay' end end execution_history.add(history_notice, @world.id) if history_notice end
# File lib/dynflow/execution_plan.rb, line 533 def add_step(step_class, action_class, action_id, state = :pending) step_class.new(self.id, self.generate_step_id, state, action_class, action_id, nil, world).tap do |new_step| @steps[new_step.id] = new_step end end
# File lib/dynflow/execution_plan.rb, line 529 def persistence world.persistence end
# File lib/dynflow/execution_plan.rb, line 590 def telemetry_common_options { :world => @world.id, :action => @label } end
# File lib/dynflow/execution_plan.rb, line 579 def toggle_telemetry_state(original, new) return if original == new @label = root_plan_step.action_class if @label.nil? Dynflow::Telemetry.with_instance do |t| t.set_gauge(:dynflow_active_execution_plans, '-1', telemetry_common_options.merge(:state => original)) unless original.nil? t.set_gauge(:dynflow_active_execution_plans, '+1', telemetry_common_options.merge(:state => new)) unless new.nil? end end
# File lib/dynflow/execution_plan.rb, line 563 def unlock_all_singleton_locks! filter = { :owner_id => 'execution-plan:' + self.id, :class => Dynflow::Coordinator::SingletonActionLock.to_s } world.coordinator.find_locks(filter).each do |lock| world.coordinator.release(lock) end end
# File lib/dynflow/execution_plan.rb, line 571 def unlock_execution_inhibition_lock! filter = { :owner_id => 'execution-plan:' + self.id, :class => Dynflow::Coordinator::ExecutionInhibitionLock.to_s } world.coordinator.find_locks(filter).each do |lock| world.coordinator.release(lock) end end