class Dynflow::Executors::Parallel
Attributes
core[R]
Public Class Methods
new(world, executor_class:, heartbeat_interval:, queues_options: { :default => { :pool_size => 5 }})
click to toggle source
# File lib/dynflow/executors/parallel.rb, line 12 def initialize(world, executor_class:, heartbeat_interval:, queues_options: { :default => { :pool_size => 5 }}) @world = world @logger = world.logger @core = executor_class.spawn name: 'parallel-executor-core', args: [world, heartbeat_interval, queues_options], initialized: @core_initialized = Concurrent::Promises.resolvable_future end
Public Instance Methods
delayed_event(director_event)
click to toggle source
# File lib/dynflow/executors/parallel.rb, line 41 def delayed_event(director_event) @core.ask([:handle_event, director_event]) director_event.result end
event(request_id, execution_plan_id, step_id, event, future = nil)
click to toggle source
# File lib/dynflow/executors/parallel.rb, line 36 def event(request_id, execution_plan_id, step_id, event, future = nil) @core.ask([:handle_event, Director::Event[request_id, execution_plan_id, step_id, event, future]]) future end
execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true)
click to toggle source
# File lib/dynflow/executors/parallel.rb, line 23 def execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true) accepted = @core.ask([:handle_execution, execution_plan_id, finished]) accepted.value! if wait_for_acceptance finished rescue Concurrent::Actor::ActorTerminated => error dynflow_error = Dynflow::Error.new('executor terminated') finished.reject dynflow_error unless finished.resolved? raise dynflow_error rescue => e finished.reject e unless finished.resolved? raise e end
execution_status(execution_plan_id = nil)
click to toggle source
# File lib/dynflow/executors/parallel.rb, line 51 def execution_status(execution_plan_id = nil) @core.ask!([:execution_status, execution_plan_id]) end
initialized()
click to toggle source
# File lib/dynflow/executors/parallel.rb, line 55 def initialized @core_initialized end
terminate(future = Concurrent::Promises.resolvable_future)
click to toggle source
# File lib/dynflow/executors/parallel.rb, line 46 def terminate(future = Concurrent::Promises.resolvable_future) @core.tell([:start_termination, future]) future end