module Dynflow::Action::V2::WithSubPlans
Constants
- DEFAULT_BATCH_SIZE
- DEFAULT_POLLING_INTERVAL
- Ping
Public Instance Methods
abort!()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 181 def abort! cancel! true end
batch(from, size)
click to toggle source
Should return a slice of size items starting from item with index from
# File lib/dynflow/action/v2/with_sub_plans.rb, line 32 def batch(from, size) raise NotImplementedError end
batch_size()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 27 def batch_size DEFAULT_BATCH_SIZE end
can_spawn_next_batch?()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 195 def can_spawn_next_batch? remaining_count > 0 end
cancel!(force = false)
click to toggle source
Cancellation handling
# File lib/dynflow/action/v2/with_sub_plans.rb, line 173 def cancel!(force = false) # Count the not-yet-planned tasks as cancelled output[:cancelled_count] = total_count - output[:planned_count] # Pass the cancel event to running sub plans if they can be cancelled sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? } suspend end
check_for_errors!()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 144 def check_for_errors! raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0 end
concurrency_limit()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 159 def concurrency_limit input[:dynflow] ||= {} input[:dynflow][:concurrency_limit] end
concurrency_limit_capacity()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 164 def concurrency_limit_capacity if limit = concurrency_limit return limit unless counts_set? capacity = limit - (output[:planned_count] - (output[:success_count] + output[:failed_count])) [0, capacity].max end end
counts_set?()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 140 def counts_set? output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count] end
create_sub_plans()
click to toggle source
Methods to be overridden
# File lib/dynflow/action/v2/with_sub_plans.rb, line 18 def create_sub_plans raise NotImplementedError end
current_batch()
click to toggle source
Batching Returns the items in the current batch
# File lib/dynflow/action/v2/with_sub_plans.rb, line 187 def current_batch start_position = output[:planned_count] size = batch_size size = concurrency_limit_capacity if concurrency_limit size = start_position + size > total_count ? total_count - start_position : size batch(start_position, size) end
done?()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 118 def done? return false if can_spawn_next_batch? || !counts_set? total_count - output[:success_count] - output[:failed_count] - output[:cancelled_count] <= 0 end
increase_counts(planned, failed)
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 103 def increase_counts(planned, failed) output[:planned_count] += planned + failed output[:failed_count] = output.fetch(:failed_count, 0) + failed output[:pending_count] = output.fetch(:pending_count, 0) + planned output[:success_count] ||= 0 end
initiate()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 66 def initiate output[:planned_count] = 0 output[:cancelled_count] = 0 output[:total_count] = total_count spawn_plans end
limit_concurrency_level!(level)
click to toggle source
Concurrency limitting
# File lib/dynflow/action/v2/with_sub_plans.rb, line 154 def limit_concurrency_level!(level) input[:dynflow] ||= {} input[:dynflow][:concurrency_limit] = level end
on_finish()
click to toggle source
Callbacks
# File lib/dynflow/action/v2/with_sub_plans.rb, line 42 def on_finish end
on_planning_finished()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 45 def on_planning_finished end
polling_interval()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 37 def polling_interval DEFAULT_POLLING_INTERVAL end
recalculate_counts()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 131 def recalculate_counts total = total_count failed = sub_plans_count('state' => %w(paused stopped), 'result' => 'error') success = sub_plans_count('state' => 'stopped', 'result' => 'success') output.update(:pending_count => total - failed - success, :failed_count => failed - output.fetch(:resumed_count, 0), :success_count => success) end
remaining_count()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 199 def remaining_count total_count - output[:cancelled_count] - output[:planned_count] end
resume()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 73 def resume if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? } output[:resumed_count] ||= 0 output[:resumed_count] += output[:failed_count] # We're starting over and need to reset the counts %w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) } initiate else tick end end
run(event = nil)
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 48 def run(event = nil) case event when nil if output[:total_count] resume else initiate end when Ping tick when ::Dynflow::Action::Cancellable::Cancel cancel! when ::Dynflow::Action::Cancellable::Abort abort! end try_to_finish || suspend_and_ping end
run_progress()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 124 def run_progress return 0.1 unless counts_set? && total_count > 0 sum = output.values_at(:success_count, :cancelled_count, :failed_count).reduce(:+) sum.to_f / total_count end
spawn_plans()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 96 def spawn_plans sub_plans = create_sub_plans sub_plans = Array[sub_plans] unless sub_plans.is_a? Array increase_counts(sub_plans.count, 0) on_planning_finished unless can_spawn_next_batch? end
suspend_and_ping()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 90 def suspend_and_ping delay = (concurrency_limit.nil? || concurrency_limit_capacity > 0) && can_spawn_next_batch? ? nil : polling_interval plan_event(Ping, delay) suspend end
tick()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 85 def tick recalculate_counts spawn_plans if can_spawn_next_batch? end
total_count()
click to toggle source
Should return the expected total count of tasks
# File lib/dynflow/action/v2/with_sub_plans.rb, line 23 def total_count raise NotImplementedError end
trigger(action_class, *args)
click to toggle source
Helper for creating sub plans
# File lib/dynflow/action/v2/with_sub_plans.rb, line 149 def trigger(action_class, *args) world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) } end
try_to_finish()
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 110 def try_to_finish return false unless done? check_for_errors! on_finish true end
Private Instance Methods
sub_plan_filter()
click to toggle source
Sub-plan lookup
# File lib/dynflow/action/v2/with_sub_plans.rb, line 206 def sub_plan_filter { 'caller_execution_plan_id' => execution_plan_id, 'caller_action_id' => self.id } end
sub_plans(filter = {})
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 211 def sub_plans(filter = {}) world.persistence.find_execution_plans(filters: sub_plan_filter.merge(filter)) end
sub_plans_count(filter = {})
click to toggle source
# File lib/dynflow/action/v2/with_sub_plans.rb, line 215 def sub_plans_count(filter = {}) world.persistence.find_execution_plan_counts(filters: sub_plan_filter.merge(filter)) end