class Dynflow::Executors::Parallel

Public Class Methods

new(world, heartbeat_interval, queues_options = { :default => { :pool_size => 5 }}) click to toggle source
Calls superclass method Dynflow::Executors::Abstract.new
# File lib/dynflow/executors/parallel.rb, line 8
def initialize(world, heartbeat_interval, queues_options = { :default => { :pool_size => 5 }})
  super(world)
  @core = Core.spawn name:        'parallel-executor-core',
                     args:        [world, heartbeat_interval, queues_options],
                     initialized: @core_initialized = Concurrent.future
end

Public Instance Methods

event(execution_plan_id, step_id, event, future = Concurrent.future) click to toggle source
# File lib/dynflow/executors/parallel.rb, line 28
def event(execution_plan_id, step_id, event, future = Concurrent.future)
  @core.ask([:handle_event, Director::Event[execution_plan_id, step_id, event, future]])
  future
end
execute(execution_plan_id, finished = Concurrent.future, wait_for_acceptance = true) click to toggle source
# File lib/dynflow/executors/parallel.rb, line 15
def execute(execution_plan_id, finished = Concurrent.future, wait_for_acceptance = true)
  accepted = @core.ask([:handle_execution, execution_plan_id, finished])
  accepted.value! if wait_for_acceptance
  finished
rescue Concurrent::Actor::ActorTerminated => error
  dynflow_error = Dynflow::Error.new('executor terminated')
  finished.fail dynflow_error unless finished.completed?
  raise dynflow_error
rescue => e
  finished.fail e unless finished.completed?
  raise e
end
execution_status(execution_plan_id = nil) click to toggle source
# File lib/dynflow/executors/parallel.rb, line 38
def execution_status(execution_plan_id = nil)
  @core.ask!([:execution_status, execution_plan_id])
end
initialized() click to toggle source
# File lib/dynflow/executors/parallel.rb, line 42
def initialized
  @core_initialized
end
terminate(future = Concurrent.future) click to toggle source
# File lib/dynflow/executors/parallel.rb, line 33
def terminate(future = Concurrent.future)
  @core.tell([:start_termination, future])
  future
end