class Dynflow::Action
rubocop:disable Metrics/ClassLength
Constants
- ERROR
- OutputReference
- Phase
- SUSPEND
- Skip
Attributes
Public Class Methods
# File lib/dynflow/action.rb, line 29 def self.all_children children.values.inject(children.values) do |children, child| children + child.all_children end end
# File lib/dynflow/action.rb, line 41 def self.children @children ||= {} end
# File lib/dynflow/action.rb, line 91 def self.constantize(action_name) super action_name rescue NameError Action::Missing.generate(action_name) end
# File lib/dynflow/action.rb, line 49 def self.execution_plan_hooks @execution_plan_hooks ||= ExecutionPlan::Hooks::Register.new end
# File lib/dynflow/action.rb, line 53 def self.inherit_execution_plan_hooks(hooks) @execution_plan_hooks = hooks end
# File lib/dynflow/action.rb, line 35 def self.inherited(child) children[child.name] = child child.inherit_execution_plan_hooks(execution_plan_hooks.dup) super child end
# File lib/dynflow/action.rb, line 45 def self.middleware @middleware ||= Middleware::Register.new end
# File lib/dynflow/action.rb, line 103 def initialize(attributes, world) Type! attributes, Hash @phase = Type! attributes.fetch(:phase), Phase @world = Type! world, World @step = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil? @execution_plan_id = Type! attributes.fetch(:execution_plan_id), String @id = Type! attributes.fetch(:id), Integer @plan_step_id = Type! attributes.fetch(:plan_step_id), Integer @run_step_id = Type! attributes.fetch(:run_step_id), Integer, NilClass @finalize_step_id = Type! attributes.fetch(:finalize_step_id), Integer, NilClass @execution_plan = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present @caller_execution_plan_id = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass) @caller_action_id = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass) getter =-> key, required do required ? attributes.fetch(key) : attributes.fetch(key, {}) end @input = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present)) @output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present end
FIND define subscriptions in world independent on action's classes,
limited only by in/output formats
@return [nil, Class] a child of Action
# File lib/dynflow/action.rb, line 60 def self.subscribe nil end
Protected Class Methods
# File lib/dynflow/action.rb, line 381 def self.new_from_hash(hash, world) hash.delete(:output) if hash[:output].nil? unless hash[:execution_plan_uuid].nil? hash[:execution_plan_id] = hash[:execution_plan_uuid] end new(hash, world) end
Private Class Methods
An action must be a singleton and have a singleton lock
# File lib/dynflow/action.rb, line 586 def self.singleton_locked?(world) if self.ancestors.include? ::Dynflow::Action::Singleton lock_class = ::Dynflow::Coordinator::SingletonActionLock world.coordinator.find_locks(lock_class.unique_filter(self.name)).any? else false end end
Public Instance Methods
# File lib/dynflow/action.rb, line 199 def action_logger world.action_logger end
@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of all (including indirectly) planned actions by this action, returned actions are in Present phase
# File lib/dynflow/action.rb, line 222 def all_planned_actions(filter_class = Action) phase! Present mine = planned_actions (mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions }). select { |a| a.is_a?(filter_class) } end
# File lib/dynflow/action.rb, line 163 def caller_action phase! Present return nil if @caller_action_id return @caller_action if @caller_action caller_execution_plan = if @caller_execution_plan_id.nil? execution_plan else world.persistence.load_execution_plan(@caller_execution_plan_id) end @caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id) end
# File lib/dynflow/action.rb, line 270 def error raise "error data not available" if @step.nil? @step.error end
# File lib/dynflow/action.rb, line 275 def execute(*args) phase! Executable self.send phase.execute_method_name, *args end
# File lib/dynflow/action.rb, line 300 def execute_delay(delay_options, *args) with_error_handling(true) do world.middleware.execute(:delay, self, delay_options, *args) do |*new_args| @serializer = delay(*new_args).tap do |serializer| serializer.perform_serialization! end end end end
# File lib/dynflow/action.rb, line 194 def execution_plan phase! Plan, Present @execution_plan end
# File lib/dynflow/action.rb, line 234 def finalize_step phase! Present execution_plan.steps.fetch(finalize_step_id) if finalize_step_id end
# File lib/dynflow/action.rb, line 189 def from_subscription? phase! Plan @from_subscription end
# File lib/dynflow/action.rb, line 315 def holds_singleton_lock? false end
@override to define more descriptive state information for the action: used in Dynflow console
# File lib/dynflow/action.rb, line 266 def humanized_state state.to_s end
# File lib/dynflow/action.rb, line 142 def input=(hash) Type! hash, Hash phase! Plan @input = Utils.indifferent_hash(hash) end
# File lib/dynflow/action.rb, line 138 def label self.class.name end
# File lib/dynflow/action.rb, line 154 def output if phase? Plan @output_reference or raise 'plan_self has to be invoked before being able to reference the output' else @output end end
# File lib/dynflow/action.rb, line 148 def output=(hash) Type! hash, Hash phase! Run @output = Utils.indifferent_hash(hash) end
# File lib/dynflow/action.rb, line 133 def phase!(*phases) phase?(*phases) or raise TypeError, "Wrong phase #{phase}, required #{phases}" end
# File lib/dynflow/action.rb, line 129 def phase?(*phases) Match? phase, *phases end
# File lib/dynflow/action.rb, line 203 def plan_step phase! Present execution_plan.steps.fetch(plan_step_id) end
@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of directly planned actions by this action, returned actions are in Present phase
# File lib/dynflow/action.rb, line 211 def planned_actions(filter = Action) phase! Present plan_step. planned_steps(execution_plan). map { |s| s.action(execution_plan) }. select { |a| a.is_a?(filter) } end
@override define what pool should the action be run in. The queue defined here will also be used as the default queue for all the steps planned under this action, unless overrided by sub-action
# File lib/dynflow/action.rb, line 322 def queue end
@api private @return [Array<Integer>] - ids of steps referenced from action
# File lib/dynflow/action.rb, line 282 def required_step_ids(input = self.input) results = [] recursion =-> value do case value when Hash value.values.each { |v| recursion.(v) } when Array value.each { |v| recursion.(v) } when ExecutionPlan::OutputReference results << value.step_id else # no reference hidden in this arg end results end recursion.(input) end
# File lib/dynflow/action.rb, line 229 def run_step phase! Present execution_plan.steps.fetch(run_step_id) if run_step_id end
# File lib/dynflow/action.rb, line 310 def serializer raise "The action must be delayed in order to access the serializer" if @serializer.nil? @serializer end
# File lib/dynflow/action.rb, line 176 def set_plan_context(execution_plan, triggering_action, from_subscription) phase! Plan @execution_plan = Type! execution_plan, ExecutionPlan @triggering_action = Type! triggering_action, Action, NilClass @from_subscription = Type! from_subscription, TrueClass, FalseClass end
# File lib/dynflow/action.rb, line 259 def state raise "state data not available" if @step.nil? @step.state end
# File lib/dynflow/action.rb, line 239 def steps [plan_step, run_step, finalize_step] end
# File lib/dynflow/action.rb, line 243 def to_hash recursive_to_hash( { class: self.class.name, execution_plan_id: execution_plan_id, id: id, plan_step_id: plan_step_id, run_step_id: run_step_id, finalize_step_id: finalize_step_id, caller_execution_plan_id: caller_execution_plan_id, caller_action_id: caller_action_id, input: input }, if phase? Run, Finalize, Present { output: output } end) end
action that caused this action to be planned. Available only in planning phase
# File lib/dynflow/action.rb, line 184 def triggering_action phase! Plan @triggering_action end
Protected Instance Methods
# File lib/dynflow/action.rb, line 341 def delay(delay_options, *args) Serializers::Noop.new(args) end
Add this method to implement the action's *Finalize phase* behaviour. It can use DB in this phase.
# File lib/dynflow/action.rb, line 372 def finalize # just a rdoc placeholder end
@override to implement the action's *Plan phase* behaviour. By default it plans itself and expects input-hash. Use plan_self and plan_action methods to plan actions. It can use DB in this phase.
# File lib/dynflow/action.rb, line 349 def plan(*args) if from_subscription? # if the action is triggered by subscription, by default use the # input of parent action. # should be replaced by referencing the input from input format plan_self(input.merge(triggering_action.input)) else # in this case, the action was triggered by plan_action. Use # the argument specified there. plan_self(*args) end self end
Add this method to implement the action's *Run phase* behaviour. It should not use DB in this phase.
# File lib/dynflow/action.rb, line 365 def run(event = nil) # just a rdoc placeholder end
# File lib/dynflow/action.rb, line 377 def run_accepts_events? method(:run).arity != 0 end
# File lib/dynflow/action.rb, line 336 def save_state phase! Executable @step.save end
# File lib/dynflow/action.rb, line 327 def state=(state) phase! Executable @world.logger.debug format('%13s %s:%2d %9s >> %9s in phase %8s %s', 'Step', execution_plan_id, @step.id, self.state, state, phase.to_s_humanized, self.class) @step.state = state end
Private Instance Methods
# File lib/dynflow/action.rb, line 564 def check_serializable(what) Match! what, :input, :output value = send what recursive_to_hash value # it raises when not serializable rescue => e value.replace not_serializable: true raise e end
DSL for plan phase
# File lib/dynflow/action.rb, line 393 def concurrence(&block) phase! Plan @execution_plan.switch_flow(Flows::Concurrence.new([]), &block) end
DSL to terminate action execution and set it to error
# File lib/dynflow/action.rb, line 440 def error!(error) phase! Executable set_error(error) throw ERROR end
# File lib/dynflow/action.rb, line 552 def execute_finalize phase! Finalize @input = OutputReference.dereference @input, world.persistence self.state = :running save_state with_error_handling do world.middleware.execute(:finalize, self) do finalize end end end
# File lib/dynflow/action.rb, line 482 def execute_plan(*args) phase! Plan self.state = :running save_state # when the error occurred inside the planning, catch that # before getting out of the planning phase with_error_handling(!root_action?) do concurrence do world.middleware.execute(:plan, self, *args) do |*new_args| plan(*new_args) end end subscribed_actions = world.subscribed_actions(self.class) if subscribed_actions.any? # we encapsulate the flow for this action into a concurrence and # add the subscribed flows to it as well. trigger_flow = @execution_plan.current_run_flow.sub_flows.pop @execution_plan.switch_flow(Flows::Concurrence.new([trigger_flow].compact)) do subscribed_actions.each do |action_class| new_plan_step = @execution_plan.add_plan_step(action_class, self) new_plan_step.execute(@execution_plan, self, true, *args) end end end check_serializable :input end end
TODO: This is getting out of hand, refactoring needed
# File lib/dynflow/action.rb, line 514 def execute_run(event) phase! Run @world.logger.debug format('%13s %s:%2d got event %s', 'Step', execution_plan_id, @step.id, event) if event @input = OutputReference.dereference @input, world.persistence case when state == :running raise NotImplementedError, 'recovery after restart is not implemented' when [:pending, :error, :skipping, :suspended].include?(state) if event && state != :suspended raise 'event can be processed only when in suspended state' end self.state = :running unless self.state == :skipping save_state with_error_handling do event = Skip if state == :skipping # we run the Skip event only when the run accepts events if event != Skip || run_accepts_events? result = catch(SUSPEND) do world.middleware.execute(:run, self, *[event].compact) do |*args| run(*args) end end self.state = :suspended if result == SUSPEND end check_serializable :output end else raise "wrong state #{state} when event:#{event}" end end
# File lib/dynflow/action.rb, line 421 def plan_action(action_class, *args) phase! Plan @execution_plan.add_plan_step(action_class, self).execute(@execution_plan, self, false, *args) end
# File lib/dynflow/action.rb, line 403 def plan_self(input = {}) phase! Plan self.input.update input if self.respond_to?(:run) run_step = @execution_plan.add_run_step(self) @run_step_id = run_step.id @output_reference = OutputReference.new(@execution_plan.id, run_step.id, id) end if self.respond_to?(:finalize) finalize_step = @execution_plan.add_finalize_step(self) @finalize_step_id = finalize_step.id end return self # to stay consistent with plan_action end
# File lib/dynflow/action.rb, line 573 def root_action? # in planning phase, the @triggered_action can be used to check whether the is root (the main action used # to create the execution plan). # For post-planning phases, the action is in root when either: # - the @caller_action_id is not set OR # - the @caller_action_id is set but the @caller_execution_plan_id is set as well # which means, the @caller_action_id is actually referencing different execution plan # and this action is creating a new execution plan, that's tracked as sub-plan # for the @caller_execution_plan_id @triggering_action.nil? && (@caller_action_id.nil? || @caller_execution_plan_id) end
# File lib/dynflow/action.rb, line 398 def sequence(&block) phase! Plan @execution_plan.switch_flow(Flows::Sequence.new([]), &block) end
# File lib/dynflow/action.rb, line 474 def set_error(error) phase! Executable Type! error, Exception, String action_logger.error error self.state = :error @step.error = ExecutionPlan::Steps::Error.new(error) end
# File lib/dynflow/action.rb, line 433 def suspend(&block) phase! Run block.call suspended_action if block throw SUSPEND, SUSPEND end
DSL for run phase
# File lib/dynflow/action.rb, line 428 def suspended_action phase! Run @suspended_action ||= Action::Suspended.new(self) end
# File lib/dynflow/action.rb, line 446 def with_error_handling(propagate_error = nil, &block) raise "wrong state #{self.state}" unless [:scheduling, :skipping, :running].include?(self.state) begin catch(ERROR) { block.call } rescue Exception => error set_error(error) # reraise low-level exceptions raise error unless Type? error, StandardError, ScriptError end case self.state when :scheduling self.state = :pending when :running self.state = :success when :skipping self.state = :skipped when :suspended, :error else raise "wrong state #{self.state}" end if propagate_error && self.state == :error raise(@step.error.exception) end end