module Dynflow::Action::V2::WithSubPlans

Constants

DEFAULT_BATCH_SIZE
DEFAULT_POLLING_INTERVAL
Ping

Public Instance Methods

abort!() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 181
def abort!
  cancel! true
end
batch(from, size) click to toggle source

Should return a slice of size items starting from item with index from

# File lib/dynflow/action/v2/with_sub_plans.rb, line 32
def batch(from, size)
  raise NotImplementedError
end
batch_size() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 27
def batch_size
  DEFAULT_BATCH_SIZE
end
can_spawn_next_batch?() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 195
def can_spawn_next_batch?
  remaining_count > 0
end
cancel!(force = false) click to toggle source

Cancellation handling

# File lib/dynflow/action/v2/with_sub_plans.rb, line 173
def cancel!(force = false)
  # Count the not-yet-planned tasks as cancelled
  output[:cancelled_count] = total_count - output[:planned_count]
  # Pass the cancel event to running sub plans if they can be cancelled
  sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? }
  suspend
end
check_for_errors!() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 144
def check_for_errors!
  raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0
end
concurrency_limit() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 159
def concurrency_limit
  input[:dynflow] ||= {}
  input[:dynflow][:concurrency_limit]
end
concurrency_limit_capacity() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 164
def concurrency_limit_capacity
  if limit = concurrency_limit
    return limit unless counts_set?
    capacity = limit - (output[:planned_count] - (output[:success_count] + output[:failed_count]))
    [0, capacity].max
  end
end
counts_set?() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 140
def counts_set?
  output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end
create_sub_plans() click to toggle source

Methods to be overridden

# File lib/dynflow/action/v2/with_sub_plans.rb, line 18
def create_sub_plans
  raise NotImplementedError
end
current_batch() click to toggle source

Batching Returns the items in the current batch

# File lib/dynflow/action/v2/with_sub_plans.rb, line 187
def current_batch
  start_position = output[:planned_count]
  size = batch_size
  size = concurrency_limit_capacity if concurrency_limit
  size = start_position + size > total_count ? total_count - start_position : size
  batch(start_position, size)
end
done?() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 118
def done?
  return false if can_spawn_next_batch? || !counts_set?

  total_count - output[:success_count] - output[:failed_count] - output[:cancelled_count] <= 0
end
increase_counts(planned, failed) click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 103
def increase_counts(planned, failed)
  output[:planned_count] += planned + failed
  output[:failed_count]  = output.fetch(:failed_count, 0) + failed
  output[:pending_count] = output.fetch(:pending_count, 0) + planned
  output[:success_count] ||= 0
end
initiate() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 66
def initiate
  output[:planned_count] = 0
  output[:cancelled_count] = 0
  output[:total_count] = total_count
  spawn_plans
end
limit_concurrency_level!(level) click to toggle source

Concurrency limitting

# File lib/dynflow/action/v2/with_sub_plans.rb, line 154
def limit_concurrency_level!(level)
  input[:dynflow] ||= {}
  input[:dynflow][:concurrency_limit] = level
end
on_finish() click to toggle source

Callbacks

# File lib/dynflow/action/v2/with_sub_plans.rb, line 42
def on_finish
end
on_planning_finished() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 45
def on_planning_finished
end
polling_interval() click to toggle source

Polling

# File lib/dynflow/action/v2/with_sub_plans.rb, line 37
def polling_interval
  DEFAULT_POLLING_INTERVAL
end
recalculate_counts() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 131
def recalculate_counts
  total   = total_count
  failed  = sub_plans_count('state' => %w(paused stopped), 'result' => 'error')
  success = sub_plans_count('state' => 'stopped', 'result' => 'success')
  output.update(:pending_count => total - failed - success,
                :failed_count  => failed - output.fetch(:resumed_count, 0),
                :success_count => success)
end
remaining_count() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 199
def remaining_count
  total_count - output[:cancelled_count] - output[:planned_count]
end
resume() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 73
def resume
  if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
    output[:resumed_count] ||= 0
    output[:resumed_count] += output[:failed_count]
    # We're starting over and need to reset the counts
    %w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
    initiate
  else
    tick
  end
end
run(event = nil) click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 48
def run(event = nil)
  case event
  when nil
    if output[:total_count]
      resume
    else
      initiate
    end
  when Ping
    tick
  when ::Dynflow::Action::Cancellable::Cancel
    cancel!
  when ::Dynflow::Action::Cancellable::Abort
    abort!
  end
  try_to_finish || suspend_and_ping
end
run_progress() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 124
def run_progress
  return 0.1 unless counts_set? && total_count > 0

  sum = output.values_at(:success_count, :cancelled_count, :failed_count).reduce(:+)
  sum.to_f / total_count
end
spawn_plans() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 96
def spawn_plans
  sub_plans = create_sub_plans
  sub_plans = Array[sub_plans] unless sub_plans.is_a? Array
  increase_counts(sub_plans.count, 0)
  on_planning_finished unless can_spawn_next_batch?
end
suspend_and_ping() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 90
def suspend_and_ping
  delay = (concurrency_limit.nil? || concurrency_limit_capacity > 0) && can_spawn_next_batch? ? nil : polling_interval
  plan_event(Ping, delay)
  suspend
end
tick() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 85
def tick
  recalculate_counts
  spawn_plans if can_spawn_next_batch?
end
total_count() click to toggle source

Should return the expected total count of tasks

# File lib/dynflow/action/v2/with_sub_plans.rb, line 23
def total_count
  raise NotImplementedError
end
trigger(action_class, *args) click to toggle source

Helper for creating sub plans

# File lib/dynflow/action/v2/with_sub_plans.rb, line 149
def trigger(action_class, *args)
  world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
end
try_to_finish() click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 110
def try_to_finish
  return false unless done?

  check_for_errors!
  on_finish
  true
end

Private Instance Methods

sub_plan_filter() click to toggle source

Sub-plan lookup

# File lib/dynflow/action/v2/with_sub_plans.rb, line 206
def sub_plan_filter
  { 'caller_execution_plan_id' => execution_plan_id,
    'caller_action_id' => self.id }
end
sub_plans(filter = {}) click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 211
def sub_plans(filter = {})
  world.persistence.find_execution_plans(filters: sub_plan_filter.merge(filter))
end
sub_plans_count(filter = {}) click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 215
def sub_plans_count(filter = {})
  world.persistence.find_execution_plan_counts(filters: sub_plan_filter.merge(filter))
end