class Dynflow::Action

rubocop:disable Metrics/ClassLength

Constants

DelayedEvent
ERROR
OutputReference
Phase
SUSPEND
Skip

Attributes

caller_action_id[R]
caller_execution_plan_id[R]
execution_plan_id[R]
finalize_step_id[R]
id[R]
input[R]
phase[R]
plan_step_id[R]
run_step_id[R]
world[R]

Public Class Methods

all_children() click to toggle source
# File lib/dynflow/action.rb, line 30
def self.all_children
  children.values.inject(children.values) do |children, child|
    children + child.all_children
  end
end
children() click to toggle source
# File lib/dynflow/action.rb, line 42
def self.children
  @children ||= {}
end
constantize(action_name) click to toggle source
Calls superclass method
# File lib/dynflow/action.rb, line 99
def self.constantize(action_name)
  super action_name
rescue NameError
  Action::Missing.generate(action_name)
end
execution_plan_hooks() click to toggle source
# File lib/dynflow/action.rb, line 50
def self.execution_plan_hooks
  @execution_plan_hooks ||= ExecutionPlan::Hooks::Register.new
end
inherit_execution_plan_hooks(hooks) click to toggle source
# File lib/dynflow/action.rb, line 54
def self.inherit_execution_plan_hooks(hooks)
  @execution_plan_hooks = hooks
end
inherited(child) click to toggle source
Calls superclass method
# File lib/dynflow/action.rb, line 36
def self.inherited(child)
  children[child.name] = child
  child.inherit_execution_plan_hooks(execution_plan_hooks.dup)
  super child
end
middleware() click to toggle source
# File lib/dynflow/action.rb, line 46
def self.middleware
  @middleware ||= Middleware::Register.new
end
new(attributes, world) click to toggle source
# File lib/dynflow/action.rb, line 111
def initialize(attributes, world)
  Type! attributes, Hash

  @phase             = Type! attributes.fetch(:phase), Phase
  @world             = Type! world, World
  @step              = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass
  raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil?
  @execution_plan_id = Type! attributes.fetch(:execution_plan_id), String
  @id                = Type! attributes.fetch(:id), Integer
  @plan_step_id      = Type! attributes.fetch(:plan_step_id), Integer
  @run_step_id       = Type! attributes.fetch(:run_step_id), Integer, NilClass
  @finalize_step_id  = Type! attributes.fetch(:finalize_step_id), Integer, NilClass

  @execution_plan    = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present

  @caller_execution_plan_id  = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass)
  @caller_action_id          = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass)

  getter =-> key, required do
    required ? attributes.fetch(key) : attributes.fetch(key, {})
  end

  @input  = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present))
  @output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present
end
subscribe() click to toggle source

FIND define subscriptions in world independent on action's classes,

limited only by in/output formats

@return [nil, Class] a child of Action

# File lib/dynflow/action.rb, line 61
def self.subscribe
  nil
end

Protected Class Methods

new_from_hash(hash, world) click to toggle source
# File lib/dynflow/action.rb, line 403
def self.new_from_hash(hash, world)
  hash.delete(:output) if hash[:output].nil?
  unless hash[:execution_plan_uuid].nil?
    hash[:execution_plan_id] = hash[:execution_plan_uuid]
  end
  new(hash, world)
end

Private Class Methods

singleton_locked?(world) click to toggle source

An action must be a singleton and have a singleton lock

# File lib/dynflow/action.rb, line 620
def self.singleton_locked?(world)
  if self.ancestors.include? ::Dynflow::Action::Singleton
    lock_class = ::Dynflow::Coordinator::SingletonActionLock
    world.coordinator.find_locks(lock_class.unique_filter(self.name)).any?
  else
    false
  end
end

Public Instance Methods

action_logger() click to toggle source
# File lib/dynflow/action.rb, line 207
def action_logger
  world.action_logger
end
all_planned_actions(filter_class = Action) click to toggle source

@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of all (including indirectly) planned actions by this action, returned actions are in Present phase

# File lib/dynflow/action.rb, line 230
def all_planned_actions(filter_class = Action)
  phase! Present
  mine = planned_actions
  (mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions }).
      select { |a| a.is_a?(filter_class) }
end
caller_action() click to toggle source
# File lib/dynflow/action.rb, line 171
def caller_action
  phase! Present
  return nil if @caller_action_id
  return @caller_action if @caller_action

  caller_execution_plan = if @caller_execution_plan_id.nil?
                            execution_plan
                          else
                            world.persistence.load_execution_plan(@caller_execution_plan_id)
                          end
  @caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id)
end
delayed_events() click to toggle source
# File lib/dynflow/action.rb, line 340
def delayed_events
  @delayed_events ||= []
end
error() click to toggle source
# File lib/dynflow/action.rb, line 278
def error
  raise "error data not available" if @step.nil?
  @step.error
end
execute(*args) click to toggle source
# File lib/dynflow/action.rb, line 283
def execute(*args)
  phase! Executable
  self.send phase.execute_method_name, *args
end
execute_delay(delay_options, *args) click to toggle source
# File lib/dynflow/action.rb, line 308
def execute_delay(delay_options, *args)
  with_error_handling(true) do
    world.middleware.execute(:delay, self, delay_options, *args) do |*new_args|
      @serializer = delay(*new_args).tap do |serializer|
        serializer.perform_serialization!
      end
    end
  end
end
execution_plan() click to toggle source
# File lib/dynflow/action.rb, line 202
def execution_plan
  phase! Plan, Present
  @execution_plan
end
finalize_step() click to toggle source
# File lib/dynflow/action.rb, line 242
def finalize_step
  phase! Present
  execution_plan.steps.fetch(finalize_step_id) if finalize_step_id
end
from_subscription?() click to toggle source
# File lib/dynflow/action.rb, line 197
def from_subscription?
  phase! Plan
  @from_subscription
end
holds_singleton_lock?() click to toggle source
# File lib/dynflow/action.rb, line 323
def holds_singleton_lock?
  false
end
humanized_state() click to toggle source

@override to define more descriptive state information for the action: used in Dynflow console

# File lib/dynflow/action.rb, line 274
def humanized_state
  state.to_s
end
input=(hash) click to toggle source
# File lib/dynflow/action.rb, line 150
def input=(hash)
  Type! hash, Hash
  phase! Plan
  @input = Utils.indifferent_hash(hash)
end
label() click to toggle source
# File lib/dynflow/action.rb, line 146
def label
  self.class.name
end
output() click to toggle source
# File lib/dynflow/action.rb, line 162
def output
  if phase? Plan
    @output_reference or
      raise 'plan_self has to be invoked before being able to reference the output'
  else
    @output
  end
end
output=(hash) click to toggle source
# File lib/dynflow/action.rb, line 156
def output=(hash)
  Type! hash, Hash
  phase! Run
  @output = Utils.indifferent_hash(hash)
end
phase!(*phases) click to toggle source
# File lib/dynflow/action.rb, line 141
def phase!(*phases)
  phase?(*phases) or
    raise TypeError, "Wrong phase #{phase}, required #{phases}"
end
phase?(*phases) click to toggle source
# File lib/dynflow/action.rb, line 137
def phase?(*phases)
  Match? phase, *phases
end
plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id) click to toggle source

Plan an event to be send to the action defined by action, what defaults to be self. if time is not passed, event is sent as soon as possible.

# File lib/dynflow/action.rb, line 335
def plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id)
  time = @world.clock.current_time + time if time.is_a?(Numeric)
  delayed_events << DelayedEvent[execution_plan_id, step_id, event, time]
end
plan_step() click to toggle source
# File lib/dynflow/action.rb, line 211
def plan_step
  phase! Present
  execution_plan.steps.fetch(plan_step_id)
end
planned_actions(filter = Action) click to toggle source

@param [Class] filter_class return only actions which are kind of `filter_class` @return [Array<Action>] of directly planned actions by this action, returned actions are in Present phase

# File lib/dynflow/action.rb, line 219
def planned_actions(filter = Action)
  phase! Present
  plan_step.
      planned_steps(execution_plan).
      map { |s| s.action(execution_plan) }.
      select { |a| a.is_a?(filter) }
end
queue() click to toggle source

@override define what pool should the action be run in. The queue defined here will also be used as the default queue for all the steps planned under this action, unless overrided by sub-action

# File lib/dynflow/action.rb, line 330
def queue
end
required_step_ids(input = self.input) click to toggle source

@api private @return [Array<Integer>] - ids of steps referenced from action

# File lib/dynflow/action.rb, line 290
def required_step_ids(input = self.input)
  results   = []
  recursion =-> value do
    case value
    when Hash
      value.values.each { |v| recursion.(v) }
    when Array
      value.each { |v| recursion.(v) }
    when ExecutionPlan::OutputReference
      results << value.step_id
    else
      # no reference hidden in this arg
    end
    results
  end
  recursion.(input)
end
run_step() click to toggle source
# File lib/dynflow/action.rb, line 237
def run_step
  phase! Present
  execution_plan.steps.fetch(run_step_id) if run_step_id
end
serializer() click to toggle source
# File lib/dynflow/action.rb, line 318
def serializer
  raise "The action must be delayed in order to access the serializer" if @serializer.nil?
  @serializer
end
set_plan_context(execution_plan, triggering_action, from_subscription) click to toggle source
# File lib/dynflow/action.rb, line 184
def set_plan_context(execution_plan, triggering_action, from_subscription)
  phase! Plan
  @execution_plan    = Type! execution_plan, ExecutionPlan
  @triggering_action = Type! triggering_action, Action, NilClass
  @from_subscription = Type! from_subscription, TrueClass, FalseClass
end
state() click to toggle source
# File lib/dynflow/action.rb, line 267
def state
  raise "state data not available" if @step.nil?
  @step.state
end
steps() click to toggle source
# File lib/dynflow/action.rb, line 247
def steps
  [plan_step, run_step, finalize_step]
end
to_hash() click to toggle source
# File lib/dynflow/action.rb, line 251
def to_hash
  recursive_to_hash(
      { class:                     self.class.name,
        execution_plan_id:         execution_plan_id,
        id:                        id,
        plan_step_id:              plan_step_id,
        run_step_id:               run_step_id,
        finalize_step_id:          finalize_step_id,
        caller_execution_plan_id:  caller_execution_plan_id,
        caller_action_id:          caller_action_id,
        input:                     input },
      if phase? Run, Finalize, Present
        { output: output }
      end)
end
triggering_action() click to toggle source

action that caused this action to be planned. Available only in planning phase

# File lib/dynflow/action.rb, line 192
def triggering_action
  phase! Plan
  @triggering_action
end

Protected Instance Methods

delay(delay_options, *args) click to toggle source
# File lib/dynflow/action.rb, line 363
def delay(delay_options, *args)
  Serializers::Noop.new(args)
end
finalize() click to toggle source

Add this method to implement the action's *Finalize phase* behaviour. It can use DB in this phase.

# File lib/dynflow/action.rb, line 394
def finalize
  # just a rdoc placeholder
end
plan(*args) click to toggle source

@override to implement the action's *Plan phase* behaviour. By default it plans itself and expects input-hash. Use plan_self and plan_action methods to plan actions. It can use DB in this phase.

# File lib/dynflow/action.rb, line 371
def plan(*args)
  if from_subscription?
    # if the action is triggered by subscription, by default use the
    # input of parent action.
    # should be replaced by referencing the input from input format
    plan_self(input.merge(triggering_action.input))
  else
    # in this case, the action was triggered by plan_action. Use
    # the argument specified there.
    plan_self(*args)
  end
  self
end
run(event = nil) click to toggle source

Add this method to implement the action's *Run phase* behaviour. It should not use DB in this phase.

# File lib/dynflow/action.rb, line 387
def run(event = nil)
  # just a rdoc placeholder
end
run_accepts_events?() click to toggle source
# File lib/dynflow/action.rb, line 399
def run_accepts_events?
  method(:run).arity != 0
end
save_state(conditions = {}) click to toggle source

If this save returns an integer, it means it was an update. The number represents the number of updated records. If it is 0, then the step was in an unexpected state and couldn't be updated

# File lib/dynflow/action.rb, line 358
def save_state(conditions = {})
  phase! Executable
  @step.save(conditions)
end
state=(state) click to toggle source
# File lib/dynflow/action.rb, line 346
def state=(state)
  phase! Executable
  @world.logger.debug format('%13s %s:%2d %9s >> %9s in phase %8s %s',
                             'Step', execution_plan_id, @step.id,
                             self.state, state,
                             phase.to_s_humanized, self.class)
  @step.state = state
end

Private Instance Methods

check_serializable(what) click to toggle source
# File lib/dynflow/action.rb, line 598
def check_serializable(what)
  Match! what, :input, :output
  value = send what
  recursive_to_hash value # it raises when not serializable
rescue => e
  value.replace not_serializable: true
  raise e
end
concurrence(&block) click to toggle source

DSL for plan phase

# File lib/dynflow/action.rb, line 415
def concurrence(&block)
  phase! Plan
  @execution_plan.switch_flow(Flows::Concurrence.new([]), &block)
end
error!(error) click to toggle source

DSL to terminate action execution and set it to error

# File lib/dynflow/action.rb, line 462
def error!(error)
  phase! Executable
  set_error(error)
  throw ERROR
end
execute_finalize() click to toggle source

rubocop:enable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

# File lib/dynflow/action.rb, line 586
def execute_finalize
  phase! Finalize
  @input     = OutputReference.dereference @input, world.persistence
  self.state = :running
  save_state
  with_error_handling do
    world.middleware.execute(:finalize, self) do
      finalize
    end
  end
end
execute_plan(*args) click to toggle source
# File lib/dynflow/action.rb, line 504
def execute_plan(*args)
  phase! Plan
  self.state = :running
  save_state

  # when the error occurred inside the planning, catch that
  # before getting out of the planning phase
  with_error_handling(!root_action?) do
    concurrence do
      world.middleware.execute(:plan, self, *args) do |*new_args|
        plan(*new_args)
      end
    end

    subscribed_actions = world.subscribed_actions(self.class)
    if subscribed_actions.any?
      # we encapsulate the flow for this action into a concurrence and
      # add the subscribed flows to it as well.
      trigger_flow = @execution_plan.current_run_flow.sub_flows.pop
      @execution_plan.switch_flow(Flows::Concurrence.new([trigger_flow].compact)) do
        subscribed_actions.each do |action_class|
          new_plan_step = @execution_plan.add_plan_step(action_class, self)
          new_plan_step.execute(@execution_plan, self, true, *args)
        end
      end
    end

    check_serializable :input
  end
end
execute_run(event) click to toggle source

TODO: This is getting out of hand, refactoring needed rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

# File lib/dynflow/action.rb, line 537
def execute_run(event)
  phase! Run
  @world.logger.debug format('%13s %s:%2d got event %s',
                             'Step', execution_plan_id, @step.id, event) if event

  case
  when state == :running
    raise NotImplementedError, 'recovery after restart is not implemented'

  when [:pending, :error, :skipping, :suspended].include?(state)
    if event && state != :suspended
      raise 'event can be processed only when in suspended state'
    end

    old_state = self.state
    self.state = :running unless self.state == :skipping
    saved = save_state(:state => %w(pending error skipping suspended))
    if saved.kind_of?(Integer) && !saved.positive?
      # The step was already in a state we're trying to transition to, most
      # likely we were about to execute it for the second time after first
      # execution was forcefully interrupted.
      # Set error and return to prevent the step from being executed twice
      set_error "Could not transition step from #{old_state} to #{self.state}, step already in #{self.state}."
      return
    end

    @input = OutputReference.dereference @input, world.persistence
    with_error_handling do
      event = Skip if state == :skipping

      # we run the Skip event only when the run accepts events
      if event != Skip || run_accepts_events?
        result = catch(SUSPEND) do
          world.middleware.execute(:run, self, *[event].compact) do |*args|
            run(*args)
          end
        end

        self.state = :suspended if result == SUSPEND
      end

      check_serializable :output
    end
  else
    raise "wrong state #{state} when event:#{event}"
  end
end
plan_action(action_class, *args) click to toggle source
# File lib/dynflow/action.rb, line 443
def plan_action(action_class, *args)
  phase! Plan
  @execution_plan.add_plan_step(action_class, self).execute(@execution_plan, self, false, *args)
end
plan_self(input = {}) click to toggle source
# File lib/dynflow/action.rb, line 425
def plan_self(input = {})
  phase! Plan
  self.input.update input

  if self.respond_to?(:run)
    run_step          = @execution_plan.add_run_step(self)
    @run_step_id      = run_step.id
    @output_reference = OutputReference.new(@execution_plan.id, run_step.id, id)
  end

  if self.respond_to?(:finalize)
    finalize_step     = @execution_plan.add_finalize_step(self)
    @finalize_step_id = finalize_step.id
  end

  return self # to stay consistent with plan_action
end
root_action?() click to toggle source
# File lib/dynflow/action.rb, line 607
def root_action?
  # in planning phase, the @triggered_action can be used to check whether the is root (the main action used
  # to create the execution plan).
  # For post-planning phases, the action is in root when either:
  #   - the @caller_action_id is not set OR
  #   - the @caller_action_id is set but the @caller_execution_plan_id is set as well
  #     which means, the @caller_action_id is actually referencing different execution plan
  #     and this action is creating a new execution plan, that's tracked as sub-plan
  #     for the @caller_execution_plan_id
  @triggering_action.nil? && (@caller_action_id.nil? || @caller_execution_plan_id)
end
sequence(&block) click to toggle source
# File lib/dynflow/action.rb, line 420
def sequence(&block)
  phase! Plan
  @execution_plan.switch_flow(Flows::Sequence.new([]), &block)
end
set_error(error) click to toggle source
# File lib/dynflow/action.rb, line 496
def set_error(error)
  phase! Executable
  Type! error, Exception, String
  action_logger.error error
  self.state  = :error
  @step.error = ExecutionPlan::Steps::Error.new(error)
end
suspend(&block) click to toggle source
# File lib/dynflow/action.rb, line 455
def suspend(&block)
  phase! Run
  block.call suspended_action if block
  throw SUSPEND, SUSPEND
end
suspended_action() click to toggle source

DSL for run phase

# File lib/dynflow/action.rb, line 450
def suspended_action
  phase! Run
  @suspended_action ||= Action::Suspended.new(self)
end
with_error_handling(propagate_error = nil, &block) click to toggle source
# File lib/dynflow/action.rb, line 468
def with_error_handling(propagate_error = nil, &block)
  raise "wrong state #{self.state}" unless [:scheduling, :skipping, :running].include?(self.state)

  begin
    catch(ERROR) { block.call }
  rescue Exception => error
    set_error(error)
    # reraise low-level exceptions
    raise error unless Type? error, StandardError, ScriptError
  end

  case self.state
  when :scheduling
    self.state = :pending
  when :running
    self.state = :success
  when :skipping
    self.state = :skipped
  when :suspended, :error
  else
    raise "wrong state #{self.state}"
  end

  if propagate_error && self.state == :error
    raise(@step.error.exception)
  end
end