# File lib/dynflow/executors/parallel/pool.rb, line 38 def initialize(core, pool_size, transaction_adapter) @executor_core = core @pool_size = pool_size @free_workers = Array.new(pool_size) { |i| Worker.spawn("worker-#{i}", reference, transaction_adapter) } @jobs = JobStorage.new end
# File lib/dynflow/executors/parallel/pool.rb, line 56 def handle_persistence_error(error) @executor_core.tell(:handle_persistence_error, error) end
# File lib/dynflow/executors/parallel/pool.rb, line 45 def schedule_work(work) @jobs.add work distribute_jobs end
# File lib/dynflow/executors/parallel/pool.rb, line 60 def start_termination(*args) super try_to_terminate end
# File lib/dynflow/executors/parallel/pool.rb, line 50 def worker_done(worker, work) @executor_core.tell([:work_finished, work]) @free_workers << worker distribute_jobs end
# File lib/dynflow/executors/parallel/pool.rb, line 75 def distribute_jobs try_to_terminate @free_workers.pop << @jobs.pop until @free_workers.empty? || @jobs.empty? end
# File lib/dynflow/executors/parallel/pool.rb, line 67 def try_to_terminate if terminating? && @free_workers.size == @pool_size @free_workers.map { |worker| worker.ask(:terminate!) }.map(&:wait) @executor_core.tell(:finish_termination) finish_termination end end