class Dynflow::Dispatcher::ExecutorDispatcher

Public Class Methods

new(world, semaphore) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 5
def initialize(world, semaphore)
  @world           = Type! world, World
  @current_futures = Set.new
end

Public Instance Methods

handle_request(envelope) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 10
def handle_request(envelope)
  match(envelope.message,
        on(Planning) { perform_planning(envelope, envelope.message)},
        on(Execution) { perform_execution(envelope, envelope.message) },
        on(Event)     { perform_event(envelope, envelope.message) },
        on(Status)    { get_execution_status(envelope, envelope.message) })
end

Protected Instance Methods

get_execution_status(envelope, envelope_message) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 89
def get_execution_status(envelope, envelope_message)
  items = @world.executor.execution_status envelope_message.execution_plan_id
  respond(envelope, ExecutionStatus[execution_status: items])
end
perform_event(envelope, event_request) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 54
def perform_event(envelope, event_request)
  future = on_finish do |f|
    f.then do
      respond(envelope, Done)
    end.rescue do |reason|
      respond(envelope, Failed[reason.to_s])
    end
  end
  if event_request.time.nil? || event_request.time < Time.now
    @world.executor.event(envelope.request_id, event_request.execution_plan_id, event_request.step_id, event_request.event, future,
                          optional: event_request.optional)
  else
    @world.clock.ping(
      @world.executor,
      event_request.time,
      Director::Event[envelope.request_id, event_request.execution_plan_id, event_request.step_id, event_request.event, Concurrent::Promises.resolvable_future,
                      event_request.optional],
      :delayed_event
    )
    # resolves the future right away - currently we do not wait for the clock ping
    future.fulfill true
  end
rescue Dynflow::Error => e
  future.reject(e) if future && !future.resolved?
end
perform_execution(envelope, execution) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 27
def perform_execution(envelope, execution)
  allocate_executor(execution.execution_plan_id, envelope.sender_id, envelope.request_id)
  execution_lock = Coordinator::ExecutionLock.new(@world, execution.execution_plan_id, envelope.sender_id, envelope.request_id)
  future = on_finish do |f|
    f.then do |plan|
      when_done(plan, envelope, execution, execution_lock)
    end.rescue do |reason|
      @world.coordinator.release(execution_lock)
      respond(envelope, Failed[reason.to_s])
    end
  end
  @world.executor.execute(execution.execution_plan_id, future)
  respond(envelope, Accepted)
rescue Dynflow::Error => e
  future.reject(e) if future && !future.resolved?
  respond(envelope, Failed[e.message])
end
perform_planning(envelope, planning) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 20
def perform_planning(envelope, planning)
  @world.executor.plan(planning.execution_plan_id)
  respond(envelope, Accepted)
rescue Dynflow::Error => e
  respond(envelope, Failed[e.message])
end
start_termination(*args) click to toggle source
Calls superclass method Dynflow::Actor#start_termination
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 80
def start_termination(*args)
  super
  if @current_futures.empty?
    reference.tell(:finish_termination)
  else
    Concurrent::Promises.zip_futures(*@current_futures).then { reference.tell(:finish_termination) }
  end
end
when_done(plan, envelope, execution, execution_lock) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 45
def when_done(plan, envelope, execution, execution_lock)
  if plan.state == :running
    @world.invalidate_execution_lock(execution_lock)
  else
    @world.coordinator.release(execution_lock)
    respond(envelope, Done)
  end
end

Private Instance Methods

allocate_executor(execution_plan_id, client_world_id, request_id) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 96
def allocate_executor(execution_plan_id, client_world_id, request_id)
  execution_lock = Coordinator::ExecutionLock.new(@world, execution_plan_id, client_world_id, request_id)
  @world.coordinator.acquire(execution_lock)
end
finish_execution(future) click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 113
def finish_execution(future)
  @current_futures.delete(future)
end
on_finish() { |future).rescue { |reason| logger.error("Unexpected fail on future #{reason}")| ... } click to toggle source
# File lib/dynflow/dispatcher/executor_dispatcher.rb, line 101
def on_finish
  raise "Dispatcher terminating: no new work can be started" if terminating?
  future = Concurrent::Promises.resolvable_future
  callbacks_future = (yield future).rescue { |reason| @world.logger.error("Unexpected fail on future #{reason}") }
  # we track currently running futures to make sure to not
  # terminate until the execution is finished (including
  # cleaning of locks etc)
  @current_futures << callbacks_future
  callbacks_future.on_resolution! { reference.tell([:finish_execution, callbacks_future]) }
  return future
end