rubocop:disable Metrics/ClassLength
# File lib/dynflow/action.rb, line 29 def self.all_children children.values.inject(children.values) do |children, child| children + child.all_children end end
# File lib/dynflow/action.rb, line 40 def self.children @children ||= {} end
# File lib/dynflow/action.rb, line 82 def self.constantize(action_name) super action_name rescue NameError Action::Missing.generate(action_name) end
# File lib/dynflow/action.rb, line 35 def self.inherited(child) children[child.name] = child super child end
# File lib/dynflow/action.rb, line 44 def self.middleware @middleware ||= Middleware::Register.new end
# File lib/dynflow/action.rb, line 94 def initialize(attributes, world) Type! attributes, Hash @phase = Type! attributes.fetch(:phase), Phase @world = Type! world, World @step = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil? @execution_plan_id = Type! attributes.fetch(:execution_plan_id), String @id = Type! attributes.fetch(:id), Integer @plan_step_id = Type! attributes.fetch(:plan_step_id), Integer @run_step_id = Type! attributes.fetch(:run_step_id), Integer, NilClass @finalize_step_id = Type! attributes.fetch(:finalize_step_id), Integer, NilClass @execution_plan = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present @caller_execution_plan_id = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass) @caller_action_id = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass) getter =-> key, required do required ? attributes.fetch(key) : attributes.fetch(key, {}) end @input = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present)) @output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present end
FIND define subscriptions in world independent on action's classes,
limited only by in/output formats
@return [nil, Class] a child of Action
# File lib/dynflow/action.rb, line 51 def self.subscribe nil end
# File lib/dynflow/action.rb, line 366 def self.new_from_hash(hash, world) new(hash, world) end
An action must be a singleton and have a singleton lock
# File lib/dynflow/action.rb, line 559 def self.singleton_locked?(world) if self.ancestors.include? ::Dynflow::Action::Singleton lock_class = ::Dynflow::Coordinator::SingletonActionLock world.coordinator.find_locks(lock_class.unique_filter(self.name)).any? else false end end
# File lib/dynflow/action.rb, line 189 def action_logger phase! Executable world.action_logger end
@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of all (including indirectly) planned actions by this action, returned actions are in Present phase
# File lib/dynflow/action.rb, line 213 def all_planned_actions(filter_class = Action) phase! Present mine = planned_actions (mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions }). select { |a| a.is_a?(filter_class) } end
# File lib/dynflow/action.rb, line 154 def caller_action plase! Present return nil if @caller_action_id return @caller_action if @caller_action caller_execution_plan = if @caller_execution_plan_id == execution_plan.id execution_plan else world.persistence.load_execution_plan(@caller_execution_plan_id) end @caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id) end
# File lib/dynflow/action.rb, line 261 def error raise "error data not available" if @step.nil? @step.error end
# File lib/dynflow/action.rb, line 266 def execute(*args) phase! Executable self.send phase.execute_method_name, *args end
# File lib/dynflow/action.rb, line 291 def execute_delay(delay_options, *args) with_error_handling(true) do world.middleware.execute(:delay, self, delay_options, *args) do |*new_args| @serializer = delay(*new_args).tap do |serializer| serializer.perform_serialization! end end end end
# File lib/dynflow/action.rb, line 184 def execution_plan phase! Plan, Present @execution_plan end
# File lib/dynflow/action.rb, line 225 def finalize_step phase! Present execution_plan.steps.fetch(finalize_step_id) if finalize_step_id end
# File lib/dynflow/action.rb, line 179 def from_subscription? phase! Plan @from_subscription end
# File lib/dynflow/action.rb, line 306 def holds_singleton_lock? false end
@override to define more descriptive state information for the action: used in Dynflow console
# File lib/dynflow/action.rb, line 257 def humanized_state state.to_s end
# File lib/dynflow/action.rb, line 133 def input=(hash) Type! hash, Hash phase! Plan @input = Utils.indifferent_hash(hash) end
# File lib/dynflow/action.rb, line 129 def label self.class.name end
# File lib/dynflow/action.rb, line 145 def output if phase? Plan @output_reference or raise 'plan_self has to be invoked before being able to reference the output' else @output end end
# File lib/dynflow/action.rb, line 139 def output=(hash) Type! hash, Hash phase! Run @output = Utils.indifferent_hash(hash) end
# File lib/dynflow/action.rb, line 124 def phase!(*phases) phase?(*phases) or raise TypeError, "Wrong phase #{phase}, required #{phases}" end
# File lib/dynflow/action.rb, line 120 def phase?(*phases) Match? phase, *phases end
# File lib/dynflow/action.rb, line 194 def plan_step phase! Present execution_plan.steps.fetch(plan_step_id) end
@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of directly planned actions by this action, returned actions are in Present phase
# File lib/dynflow/action.rb, line 202 def planned_actions(filter = Action) phase! Present plan_step. planned_steps(execution_plan). map { |s| s.action(execution_plan) }. select { |a| a.is_a?(filter) } end
@api private @return [Array<Integer>] - ids of steps referenced from action
# File lib/dynflow/action.rb, line 273 def required_step_ids(input = self.input) results = [] recursion =-> value do case value when Hash value.values.each { |v| recursion.(v) } when Array value.each { |v| recursion.(v) } when ExecutionPlan::OutputReference results << value.step_id else # no reference hidden in this arg end results end recursion.(input) end
# File lib/dynflow/action.rb, line 220 def run_step phase! Present execution_plan.steps.fetch(run_step_id) if run_step_id end
# File lib/dynflow/action.rb, line 301 def serializer raise "The action must be delayed in order to access the serializer" if @serializer.nil? @serializer end
# File lib/dynflow/action.rb, line 167 def set_plan_context(execution_plan, triggering_action, from_subscription) phase! Plan @execution_plan = Type! execution_plan, ExecutionPlan @triggering_action = Type! triggering_action, Action, NilClass @from_subscription = Type! from_subscription, TrueClass, FalseClass end
# File lib/dynflow/action.rb, line 250 def state raise "state data not available" if @step.nil? @step.state end
# File lib/dynflow/action.rb, line 230 def steps [plan_step, run_step, finalize_step] end
# File lib/dynflow/action.rb, line 234 def to_hash recursive_to_hash( { class: self.class.name, execution_plan_id: execution_plan_id, id: id, plan_step_id: plan_step_id, run_step_id: run_step_id, finalize_step_id: finalize_step_id, caller_execution_plan_id: caller_execution_plan_id, caller_action_id: caller_action_id, input: input }, if phase? Run, Finalize, Present { output: output } end) end
# File lib/dynflow/action.rb, line 174 def triggering_action phase! Plan @triggering_action end
# File lib/dynflow/action.rb, line 326 def delay(delay_options, *args) Serializers::Noop.new(args) end
Add this method to implement the action's *Finalize phase* behaviour. It can use DB in this phase.
# File lib/dynflow/action.rb, line 357 def finalize # just a rdoc placeholder end
@override to implement the action's *Plan phase* behaviour. By default it plans itself and expects input-hash. Use plan_self and plan_action methods to plan actions. It can use DB in this phase.
# File lib/dynflow/action.rb, line 334 def plan(*args) if from_subscription? # if the action is triggered by subscription, by default use the # input of parent action. # should be replaced by referencing the input from input format plan_self(input.merge(triggering_action.input)) else # in this case, the action was triggered by plan_action. Use # the argument specified there. plan_self(*args) end self end
Add this method to implement the action's *Run phase* behaviour. It should not use DB in this phase.
# File lib/dynflow/action.rb, line 350 def run(event = nil) # just a rdoc placeholder end
# File lib/dynflow/action.rb, line 362 def run_accepts_events? method(:run).arity != 0 end
# File lib/dynflow/action.rb, line 321 def save_state phase! Executable @step.save end
# File lib/dynflow/action.rb, line 312 def state=(state) phase! Executable @world.logger.debug format('%13s %s:%2d %9s >> %9s in phase %8s %s', 'Step', execution_plan_id, @step.id, self.state, state, phase.to_s_humanized, self.class) @step.state = state end
# File lib/dynflow/action.rb, line 545 def check_serializable(what) Match! what, :input, :output value = send what recursive_to_hash value # it raises when not serializable rescue => e value.replace not_serializable: true raise e end
DSL for plan phase
# File lib/dynflow/action.rb, line 374 def concurrence(&block) phase! Plan @execution_plan.switch_flow(Flows::Concurrence.new([]), &block) end
DSL to terminate action execution and set it to error
# File lib/dynflow/action.rb, line 421 def error!(error) phase! Executable set_error(error) throw ERROR end
# File lib/dynflow/action.rb, line 533 def execute_finalize phase! Finalize @input = OutputReference.dereference @input, world.persistence self.state = :running save_state with_error_handling do world.middleware.execute(:finalize, self) do finalize end end end
# File lib/dynflow/action.rb, line 463 def execute_plan(*args) phase! Plan self.state = :running save_state # when the error occurred inside the planning, catch that # before getting out of the planning phase with_error_handling(!root_action?) do concurrence do world.middleware.execute(:plan, self, *args) do |*new_args| plan(*new_args) end end subscribed_actions = world.subscribed_actions(self.class) if subscribed_actions.any? # we encapsulate the flow for this action into a concurrence and # add the subscribed flows to it as well. trigger_flow = @execution_plan.current_run_flow.sub_flows.pop @execution_plan.switch_flow(Flows::Concurrence.new([trigger_flow].compact)) do subscribed_actions.each do |action_class| new_plan_step = @execution_plan.add_plan_step(action_class, self) new_plan_step.execute(@execution_plan, self, true, *args) end end end check_serializable :input end end
# File lib/dynflow/action.rb, line 494 def execute_run(event) phase! Run @world.logger.debug format('%13s %s:%2d got event %s', 'Step', execution_plan_id, @step.id, event) if event @input = OutputReference.dereference @input, world.persistence case when state == :running raise NotImplementedError, 'recovery after restart is not implemented' when [:pending, :error, :skipping, :suspended].include?(state) if event && state != :suspended raise 'event can be processed only when in suspended state' end self.state = :running unless self.state == :skipping save_state with_error_handling do event = Skip if state == :skipping # we run the Skip event only when the run accepts events if event != Skip || run_accepts_events? result = catch(SUSPEND) do world.middleware.execute(:run, self, *[event].compact) do |*args| run(*args) end end self.state = :suspended if result == SUSPEND end check_serializable :output end else raise "wrong state #{state} when event:#{event}" end end
# File lib/dynflow/action.rb, line 402 def plan_action(action_class, *args) phase! Plan @execution_plan.add_plan_step(action_class, self).execute(@execution_plan, self, false, *args) end
# File lib/dynflow/action.rb, line 384 def plan_self(input = {}) phase! Plan self.input.update input if self.respond_to?(:run) run_step = @execution_plan.add_run_step(self) @run_step_id = run_step.id @output_reference = OutputReference.new(@execution_plan.id, run_step.id, id) end if self.respond_to?(:finalize) finalize_step = @execution_plan.add_finalize_step(self) @finalize_step_id = finalize_step.id end return self # to stay consistent with plan_action end
# File lib/dynflow/action.rb, line 554 def root_action? @triggering_action.nil? end
# File lib/dynflow/action.rb, line 379 def sequence(&block) phase! Plan @execution_plan.switch_flow(Flows::Sequence.new([]), &block) end
# File lib/dynflow/action.rb, line 455 def set_error(error) phase! Executable Type! error, Exception, String action_logger.error error self.state = :error @step.error = ExecutionPlan::Steps::Error.new(error) end
# File lib/dynflow/action.rb, line 414 def suspend(&block) phase! Run block.call suspended_action if block throw SUSPEND, SUSPEND end
DSL for run phase
# File lib/dynflow/action.rb, line 409 def suspended_action phase! Run @suspended_action ||= Action::Suspended.new(self) end
# File lib/dynflow/action.rb, line 427 def with_error_handling(propagate_error = nil, &block) raise "wrong state #{self.state}" unless [:scheduling, :skipping, :running].include?(self.state) begin catch(ERROR) { block.call } rescue Exception => error set_error(error) # reraise low-level exceptions raise error unless Type? error, StandardError, ScriptError end case self.state when :scheduling self.state = :pending when :running self.state = :success when :skipping self.state = :skipped when :suspended, :error else raise "wrong state #{self.state}" end if propagate_error && self.state == :error raise(@step.error.exception) end end