class Dynflow::Executors::Parallel::Core
Attributes
logger[R]
Public Class Methods
new(world, heartbeat_interval, queues_options)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core::new
# File lib/dynflow/executors/parallel/core.rb, line 11 def initialize(world, heartbeat_interval, queues_options) super @pools = {} initialize_queues end
Public Instance Methods
execution_status(execution_plan_id = nil)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 39 def execution_status(execution_plan_id = nil) @pools.each_with_object({}) do |(pool_name, pool), hash| hash[pool_name] = pool.ask!([:execution_status, execution_plan_id]) end end
feed_pool(work_items)
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 45 def feed_pool(work_items) work_items.each do |new_work| new_work.world = @world @pools.fetch(suggest_queue(new_work)).tell([:schedule_work, new_work]) end end
finish_termination(pool_name)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#finish_termination
# File lib/dynflow/executors/parallel/core.rb, line 32 def finish_termination(pool_name) @pools.delete(pool_name) # we expect this message from all worker pools return unless @pools.empty? super() end
initialize_queues()
click to toggle source
# File lib/dynflow/executors/parallel/core.rb, line 17 def initialize_queues default_pool_size = @queues_options[:default][:pool_size] @queues_options.each do |(queue_name, queue_options)| queue_pool_size = queue_options.fetch(:pool_size, default_pool_size) @pools[queue_name] = Pool.spawn("pool #{queue_name}", @world, reference, queue_name, queue_pool_size, @world.transaction_adapter) end end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#start_termination
# File lib/dynflow/executors/parallel/core.rb, line 27 def start_termination(*args) super @pools.values.each { |pool| pool.tell([:start_termination, Concurrent::Promises.resolvable_future]) } end