class Dynflow::Executors::Parallel::Pool::JobStorage
Public Class Methods
new()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 6 def initialize @round_robin = RoundRobin.new @jobs = Hash.new { |h, k| h[k] = [] } end
Public Instance Methods
add(work)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 11 def add(work) @round_robin.add work.execution_plan_id unless tracked?(work) @jobs[work.execution_plan_id] << work end
empty?()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 26 def empty? @jobs.empty? end
execution_status(execution_plan_id = nil)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 30 def execution_status(execution_plan_id = nil) source = if execution_plan_id.nil? @jobs else { execution_plan_id => @jobs.fetch(execution_plan_id, []) } end source.reduce({}) do |acc, (plan_id, work_items)| acc.update(plan_id => work_items.count) end end
pop()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 16 def pop return nil if empty? execution_plan_id = @round_robin.next @jobs[execution_plan_id].shift.tap { delete execution_plan_id if @jobs[execution_plan_id].empty? } end
queue_size()
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 22 def queue_size execution_status.values.reduce(0, :+) end
Private Instance Methods
delete(execution_plan_id)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 47 def delete(execution_plan_id) @round_robin.delete execution_plan_id @jobs.delete execution_plan_id end
tracked?(work)
click to toggle source
# File lib/dynflow/executors/parallel/pool.rb, line 43 def tracked?(work) @jobs.has_key? work.execution_plan_id end