class Dynflow::ThrottleLimiter::Core
Public Class Methods
new(world)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 50 def initialize(world) @world = world @semaphores = {} end
Public Instance Methods
cancel(parent_id, reason = nil)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 97 def cancel(parent_id, reason = nil) if @semaphores.key?(parent_id) reason ||= 'The task was cancelled.' @semaphores[parent_id].waiting.each do |triggered| cancel_plan_id(triggered.execution_plan_id, reason) triggered.future.reject(reason) end finish(parent_id) end end
finish(parent_id)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 108 def finish(parent_id) @semaphores.delete(parent_id) end
handle_plans(parent_id, planned_ids, failed_ids)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 60 def handle_plans(parent_id, planned_ids, failed_ids) failed = failed_ids.map do |plan_id| ::Dynflow::World::Triggered[plan_id, Concurrent::Promises.resolvable_future].tap do |triggered| execute_triggered(triggered) end end planned_ids.map do |child_id| ::Dynflow::World::Triggered[child_id, Concurrent::Promises.resolvable_future].tap do |triggered| triggered.future.on_resolution! { self << [:release, parent_id] } execute_triggered(triggered) if @semaphores[parent_id].wait(triggered) end end + failed end
initialize_plan(plan_id, semaphores_hash)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 55 def initialize_plan(plan_id, semaphores_hash) @semaphores[plan_id] = create_semaphores(semaphores_hash) set_up_clock_for(plan_id, true) end
observe(parent_id = nil)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 75 def observe(parent_id = nil) if parent_id.nil? @semaphores.reduce([]) do |acc, cur| acc << { cur.first => cur.last.waiting } end elsif @semaphores.key? parent_id @semaphores[parent_id].waiting else [] end end
release(plan_id, key = :level)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 87 def release(plan_id, key = :level) return unless @semaphores.key? plan_id set_up_clock_for(plan_id) if key == :time semaphore = @semaphores[plan_id] semaphore.release(1, key) if semaphore.children.key?(key) if semaphore.has_waiting? && semaphore.get == 1 execute_triggered(semaphore.get_waiting) end end
Private Instance Methods
cancel_plan_id(plan_id, reason)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 114 def cancel_plan_id(plan_id, reason) plan = @world.persistence.load_execution_plan(plan_id) steps = plan.run_steps steps.each do |step| step.state = :error step.error = ::Dynflow::ExecutionPlan::Steps::Error.new(reason) step.save end plan.update_state(:stopped) plan.save end
create_semaphores(hash)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 140 def create_semaphores(hash) semaphores = hash.keys.reduce(Utils.indifferent_hash({})) do |acc, key| acc.merge(key => ::Dynflow::Semaphores::Stateful.new_from_hash(hash[key])) end ::Dynflow::Semaphores::Aggregating.new(semaphores) end
execute_triggered(triggered)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 126 def execute_triggered(triggered) @world.execute(triggered.execution_plan_id, triggered.finished) end
set_up_clock_for(plan_id, initial = false)
click to toggle source
# File lib/dynflow/throttle_limiter.rb, line 130 def set_up_clock_for(plan_id, initial = false) if @semaphores[plan_id].children.key? :time timeout_message = 'The task could not be started within the maintenance window.' interval = @semaphores[plan_id].children[:time].meta[:interval] timeout = @semaphores[plan_id].children[:time].meta[:time_span] @world.clock.ping(self, interval, [:release, plan_id, :time]) @world.clock.ping(self, timeout, [:cancel, plan_id, timeout_message]) if initial end end