class Dynflow::Action
rubocop:disable Metrics/ClassLength
Constants
- DelayedEvent
- ERROR
- OutputReference
- Phase
- SUSPEND
- Skip
Attributes
Public Class Methods
# File lib/dynflow/action.rb, line 31 def self.all_children children.values.inject(children.values) do |children, child| children + child.all_children end end
# File lib/dynflow/action.rb, line 43 def self.children @children ||= {} end
# File lib/dynflow/action.rb, line 101 def self.constantize(action_name) super action_name rescue NameError Action::Missing.generate(action_name) end
# File lib/dynflow/action.rb, line 51 def self.execution_plan_hooks @execution_plan_hooks ||= ExecutionPlan::Hooks::Register.new end
# File lib/dynflow/action.rb, line 55 def self.inherit_execution_plan_hooks(hooks) @execution_plan_hooks = hooks end
# File lib/dynflow/action.rb, line 37 def self.inherited(child) children[child.name] = child child.inherit_execution_plan_hooks(execution_plan_hooks.dup) super child end
# File lib/dynflow/action.rb, line 47 def self.middleware @middleware ||= Middleware::Register.new end
# File lib/dynflow/action.rb, line 114 def initialize(attributes, world) Type! attributes, Hash @phase = Type! attributes.fetch(:phase), Phase @world = Type! world, World @step = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil? @execution_plan_id = Type! attributes.fetch(:execution_plan_id), String @id = Type! attributes.fetch(:id), Integer @plan_step_id = Type! attributes.fetch(:plan_step_id), Integer @run_step_id = Type! attributes.fetch(:run_step_id), Integer, NilClass @finalize_step_id = Type! attributes.fetch(:finalize_step_id), Integer, NilClass @execution_plan = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present @caller_execution_plan_id = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass) @caller_action_id = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass) getter =-> key, required do required ? attributes.fetch(key) : attributes.fetch(key, {}) end @input = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present)) @output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present @pending_output_chunks = [] if phase? Run, Finalize end
FIND define subscriptions in world independent on action's classes,
limited only by in/output formats
@return [nil, Class] a child of Action
# File lib/dynflow/action.rb, line 62 def self.subscribe nil end
Protected Class Methods
# File lib/dynflow/action.rb, line 421 def self.new_from_hash(hash, world) hash.delete(:output) if hash[:output].nil? unless hash[:execution_plan_uuid].nil? hash[:execution_plan_id] = hash[:execution_plan_uuid] end new(hash, world) end
Private Class Methods
An action must be a singleton and have a singleton lock
# File lib/dynflow/action.rb, line 638 def self.singleton_locked?(world) if self.ancestors.include? ::Dynflow::Action::Singleton lock_class = ::Dynflow::Coordinator::SingletonActionLock world.coordinator.find_locks(lock_class.unique_filter(self.name)).any? else false end end
Public Instance Methods
# File lib/dynflow/action.rb, line 225 def action_logger world.action_logger end
@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of all (including indirectly) planned actions by this action, returned actions are in Present phase
# File lib/dynflow/action.rb, line 248 def all_planned_actions(filter_class = Action) phase! Present mine = planned_actions (mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions }). select { |a| a.is_a?(filter_class) } end
# File lib/dynflow/action.rb, line 189 def caller_action phase! Present return nil if @caller_action_id return @caller_action if @caller_action caller_execution_plan = if @caller_execution_plan_id.nil? execution_plan else world.persistence.load_execution_plan(@caller_execution_plan_id) end @caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id) end
# File lib/dynflow/action.rb, line 358 def delayed_events @delayed_events ||= [] end
# File lib/dynflow/action.rb, line 183 def drop_output_chunks! @pending_output_chunks = [] @output_chunks = [] world.persistence.delete_output_chunks(@execution_plan_id, @id) end
# File lib/dynflow/action.rb, line 296 def error raise "error data not available" if @step.nil? @step.error end
# File lib/dynflow/action.rb, line 301 def execute(*args) phase! Executable self.send phase.execute_method_name, *args end
# File lib/dynflow/action.rb, line 326 def execute_delay(delay_options, *args) with_error_handling(true) do world.middleware.execute(:delay, self, delay_options, *args) do |*new_args| @serializer = delay(*new_args).tap do |serializer| serializer.perform_serialization! end end end end
# File lib/dynflow/action.rb, line 220 def execution_plan phase! Plan, Present @execution_plan end
# File lib/dynflow/action.rb, line 260 def finalize_step phase! Present execution_plan.steps.fetch(finalize_step_id) if finalize_step_id end
# File lib/dynflow/action.rb, line 215 def from_subscription? phase! Plan @from_subscription end
# File lib/dynflow/action.rb, line 341 def holds_singleton_lock? false end
@override to define more descriptive state information for the action: used in Dynflow
console
# File lib/dynflow/action.rb, line 292 def humanized_state state.to_s end
# File lib/dynflow/action.rb, line 154 def input=(hash) Type! hash, Hash phase! Plan @input = Utils.indifferent_hash(hash) end
# File lib/dynflow/action.rb, line 150 def label self.class.name end
# File lib/dynflow/action.rb, line 166 def output if phase? Plan @output_reference or raise 'plan_self has to be invoked before being able to reference the output' else @output end end
# File lib/dynflow/action.rb, line 160 def output=(hash) Type! hash, Hash phase! Run @output = Utils.indifferent_hash(hash) end
# File lib/dynflow/action.rb, line 175 def output_chunk(chunk, kind: nil, timestamp: Time.now) @pending_output_chunks << { chunk: chunk, kind: kind, timestamp: timestamp } end
# File lib/dynflow/action.rb, line 145 def phase!(*phases) phase?(*phases) or raise TypeError, "Wrong phase #{phase}, required #{phases}" end
# File lib/dynflow/action.rb, line 141 def phase?(*phases) Match? phase, *phases end
Plan an event
to be send to the action defined by action
, what defaults to be self. if time
is not passed, event is sent as soon as possible.
# File lib/dynflow/action.rb, line 353 def plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false) time = @world.clock.current_time + time if time.is_a?(Numeric) delayed_events << DelayedEvent[execution_plan_id, step_id, event, time, optional] end
# File lib/dynflow/action.rb, line 229 def plan_step phase! Present execution_plan.steps.fetch(plan_step_id) end
@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of directly planned actions by this action, returned actions are in Present phase
# File lib/dynflow/action.rb, line 237 def planned_actions(filter = Action) phase! Present plan_step. planned_steps(execution_plan). map { |s| s.action(execution_plan) }. select { |a| a.is_a?(filter) } end
@override define what pool should the action be run in. The queue defined here will also be used as the default queue for all the steps planned under this action, unless overrided by sub-action
# File lib/dynflow/action.rb, line 348 def queue end
@api private @return [Array<Integer>] - ids of steps referenced from action
# File lib/dynflow/action.rb, line 308 def required_step_ids(input = self.input) results = [] recursion =-> value do case value when Hash value.values.each { |v| recursion.(v) } when Array value.each { |v| recursion.(v) } when ExecutionPlan::OutputReference results << value.step_id else # no reference hidden in this arg end results end recursion.(input) end
# File lib/dynflow/action.rb, line 255 def run_step phase! Present execution_plan.steps.fetch(run_step_id) if run_step_id end
# File lib/dynflow/action.rb, line 336 def serializer raise "The action must be delayed in order to access the serializer" if @serializer.nil? @serializer end
# File lib/dynflow/action.rb, line 202 def set_plan_context(execution_plan, triggering_action, from_subscription) phase! Plan @execution_plan = Type! execution_plan, ExecutionPlan @triggering_action = Type! triggering_action, Action, NilClass @from_subscription = Type! from_subscription, TrueClass, FalseClass end
# File lib/dynflow/action.rb, line 285 def state raise "state data not available" if @step.nil? @step.state end
# File lib/dynflow/action.rb, line 265 def steps [plan_step, run_step, finalize_step] end
# File lib/dynflow/action.rb, line 179 def stored_output_chunks @output_chunks ||= world.persistence.load_output_chunks(@execution_plan_id, @id) end
# File lib/dynflow/action.rb, line 269 def to_hash recursive_to_hash( { class: self.class.name, execution_plan_id: execution_plan_id, id: id, plan_step_id: plan_step_id, run_step_id: run_step_id, finalize_step_id: finalize_step_id, caller_execution_plan_id: caller_execution_plan_id, caller_action_id: caller_action_id, input: input }, if phase? Run, Finalize, Present { output: output } end) end
action that caused this action to be planned. Available only in planning phase
# File lib/dynflow/action.rb, line 210 def triggering_action phase! Plan @triggering_action end
Protected Instance Methods
# File lib/dynflow/action.rb, line 381 def delay(delay_options, *args) Serializers::Noop.new(args) end
Add this method to implement the action's *Finalize phase* behaviour. It can use DB in this phase.
# File lib/dynflow/action.rb, line 412 def finalize # just a rdoc placeholder end
@override to implement the action's *Plan phase* behaviour. By default it plans itself and expects input-hash. Use plan_self
and plan_action
methods to plan actions. It can use DB in this phase.
# File lib/dynflow/action.rb, line 389 def plan(*args) if from_subscription? # if the action is triggered by subscription, by default use the # input of parent action. # should be replaced by referencing the input from input format plan_self(input.merge(triggering_action.input)) else # in this case, the action was triggered by plan_action. Use # the argument specified there. plan_self(*args) end self end
Add this method to implement the action's *Run phase* behaviour. It should not use DB in this phase.
# File lib/dynflow/action.rb, line 405 def run(event = nil) # just a rdoc placeholder end
# File lib/dynflow/action.rb, line 417 def run_accepts_events? method(:run).arity != 0 end
If this save returns an integer, it means it was an update. The number represents the number of updated records. If it is 0, then the step was in an unexpected state and couldn't be updated
# File lib/dynflow/action.rb, line 376 def save_state(conditions = {}) phase! Executable @step.save(conditions) end
# File lib/dynflow/action.rb, line 364 def state=(state) phase! Executable @world.logger.debug format('%13s %s:%2d %9s >> %9s in phase %8s %s', 'Step', execution_plan_id, @step.id, self.state, state, phase.to_s_humanized, self.class) @step.state = state end
Private Instance Methods
# File lib/dynflow/action.rb, line 616 def check_serializable(what) Match! what, :input, :output value = send what recursive_to_hash value # it raises when not serializable rescue => e value.replace not_serializable: true raise e end
DSL for plan phase
# File lib/dynflow/action.rb, line 433 def concurrence(&block) phase! Plan @execution_plan.switch_flow(Flows::Concurrence.new([]), &block) end
DSL to terminate action execution and set it to error
# File lib/dynflow/action.rb, line 480 def error!(error) phase! Executable set_error(error) throw ERROR end
rubocop:enable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
# File lib/dynflow/action.rb, line 604 def execute_finalize phase! Finalize @input = OutputReference.dereference @input, world.persistence self.state = :running save_state with_error_handling do world.middleware.execute(:finalize, self) do finalize end end end
# File lib/dynflow/action.rb, line 522 def execute_plan(*args) phase! Plan self.state = :running save_state # when the error occurred inside the planning, catch that # before getting out of the planning phase with_error_handling(!root_action?) do concurrence do world.middleware.execute(:plan, self, *args) do |*new_args| plan(*new_args) end end subscribed_actions = world.subscribed_actions(self.class) if subscribed_actions.any? # we encapsulate the flow for this action into a concurrence and # add the subscribed flows to it as well. trigger_flow = @execution_plan.current_run_flow.sub_flows.pop @execution_plan.switch_flow(Flows::Concurrence.new([trigger_flow].compact)) do subscribed_actions.each do |action_class| new_plan_step = @execution_plan.add_plan_step(action_class, self) new_plan_step.execute(@execution_plan, self, true, *args) end end end check_serializable :input end end
TODO: This is getting out of hand, refactoring needed rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
# File lib/dynflow/action.rb, line 555 def execute_run(event) phase! Run @world.logger.debug format('%13s %s:%2d got event %s', 'Step', execution_plan_id, @step.id, event) if event case when state == :running raise NotImplementedError, 'recovery after restart is not implemented' when [:pending, :error, :skipping, :suspended].include?(state) if event && state != :suspended raise 'event can be processed only when in suspended state' end old_state = self.state self.state = :running unless self.state == :skipping saved = save_state(:state => %w(pending error skipping suspended)) if saved.kind_of?(Integer) && !saved.positive? # The step was already in a state we're trying to transition to, most # likely we were about to execute it for the second time after first # execution was forcefully interrupted. # Set error and return to prevent the step from being executed twice set_error "Could not transition step from #{old_state} to #{self.state}, step already in #{self.state}." return end @input = OutputReference.dereference @input, world.persistence with_error_handling do event = Skip if state == :skipping # we run the Skip event only when the run accepts events if event != Skip || run_accepts_events? result = catch(SUSPEND) do world.middleware.execute(:run, self, *[event].compact) do |*args| run(*args) end end self.state = :suspended if result == SUSPEND end check_serializable :output end else raise "wrong state #{state} when event:#{event}" end end
# File lib/dynflow/action.rb, line 461 def plan_action(action_class, *args) phase! Plan @execution_plan.add_plan_step(action_class, self).execute(@execution_plan, self, false, *args) end
# File lib/dynflow/action.rb, line 443 def plan_self(input = {}) phase! Plan self.input.update input if self.respond_to?(:run) run_step = @execution_plan.add_run_step(self) @run_step_id = run_step.id @output_reference = OutputReference.new(@execution_plan.id, run_step.id, id) end if self.respond_to?(:finalize) finalize_step = @execution_plan.add_finalize_step(self) @finalize_step_id = finalize_step.id end return self # to stay consistent with plan_action end
# File lib/dynflow/action.rb, line 625 def root_action? # in planning phase, the @triggered_action can be used to check whether the is root (the main action used # to create the execution plan). # For post-planning phases, the action is in root when either: # - the @caller_action_id is not set OR # - the @caller_action_id is set but the @caller_execution_plan_id is set as well # which means, the @caller_action_id is actually referencing different execution plan # and this action is creating a new execution plan, that's tracked as sub-plan # for the @caller_execution_plan_id @triggering_action.nil? && (@caller_action_id.nil? || @caller_execution_plan_id) end
# File lib/dynflow/action.rb, line 438 def sequence(&block) phase! Plan @execution_plan.switch_flow(Flows::Sequence.new([]), &block) end
# File lib/dynflow/action.rb, line 514 def set_error(error) phase! Executable Type! error, Exception, String action_logger.error error self.state = :error @step.error = ExecutionPlan::Steps::Error.new(error) end
# File lib/dynflow/action.rb, line 473 def suspend(&block) phase! Run block.call suspended_action if block throw SUSPEND, SUSPEND end
DSL for run phase
# File lib/dynflow/action.rb, line 468 def suspended_action phase! Run @suspended_action ||= Action::Suspended.new(self) end
# File lib/dynflow/action.rb, line 486 def with_error_handling(propagate_error = nil, &block) raise "wrong state #{self.state}" unless [:scheduling, :skipping, :running].include?(self.state) begin catch(ERROR) { block.call } rescue Exception => error set_error(error) # reraise low-level exceptions raise error unless Type? error, StandardError, ScriptError end case self.state when :scheduling self.state = :pending when :running self.state = :success when :skipping self.state = :skipped when :suspended, :error else raise "wrong state #{self.state}" end if propagate_error && self.state == :error raise(@step.error.exception) end end