class Dynflow::Dispatcher::ClientDispatcher

Constants

TrackedRequest

Attributes

ping_cache[R]

Public Class Methods

new(world, ping_cache_age) click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 106
def initialize(world, ping_cache_age)
  @world            = Type! world, World
  @last_id_suffix   = 0
  @tracked_requests = {}
  @terminated       = nil
  @ping_cache       = PingCache.new world, ping_cache_age
end

Public Instance Methods

add_ping_cache_record(id) click to toggle source

Records when was the world with provided id last seen using a PingCache

@param id [String] Id of the world @see Dynflow::Dispatcher::ClientDispatcher::PingCache#add_record

# File lib/dynflow/dispatcher/client_dispatcher.rb, line 179
def add_ping_cache_record(id)
  log Logger::DEBUG, "adding ping cache record for #{id}"
  @ping_cache.add_record id
end
dispatch_request(request, client_world_id, request_id) click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 133
def dispatch_request(request, client_world_id, request_id)
  executor_id = match request,
                      (on ~Execution do |execution|
                         AnyExecutor
                       end),
                      (on ~Event do |event|
                         find_executor(event.execution_plan_id)
                       end),
                      (on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _|
                         receiver_id
                       end)
  envelope = Envelope[request_id, client_world_id, executor_id, request]
  if Dispatcher::UnknownWorld === envelope.receiver_id
    raise Dynflow::Error, "Could not find an executor for #{envelope}"
  end
  connector.send(envelope).value!
rescue => e
  log(Logger::ERROR, e)
  respond(envelope, Failed[e.message]) if envelope
end
dispatch_response(envelope) click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 154
def dispatch_response(envelope)
  return unless @tracked_requests.key?(envelope.request_id)
  match envelope.message,
        (on ~Accepted do
           @tracked_requests[envelope.request_id].accept!
         end),
        (on ~Failed do |msg|
           resolve_tracked_request(envelope.request_id, Dynflow::Error.new(msg.error))
         end),
        (on Done do
           resolve_tracked_request(envelope.request_id)
         end),
        (on Pong do
           add_ping_cache_record(envelope.sender_id)
           resolve_tracked_request(envelope.request_id)
         end),
        (on ExecutionStatus.(~any) do |steps|
           @tracked_requests.delete(envelope.request_id).success! steps
         end)
end
publish_request(future, request, timeout) click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 114
def publish_request(future, request, timeout)
  with_ping_request_caching(request, future) do
    track_request(future, request, timeout) do |tracked_request|
      dispatch_request(request, @world.id, tracked_request.id)
    end
  end
end
start_termination(*args) click to toggle source
Calls superclass method Dynflow::Actor#start_termination
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 126
def start_termination(*args)
  super
  @tracked_requests.values.each { |tracked_request| tracked_request.fail!(Dynflow::Error.new('Dispatcher terminated')) }
  @tracked_requests.clear
  finish_termination
end
timeout(request_id) click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 122
def timeout(request_id)
  resolve_tracked_request(request_id, Dynflow::Error.new("Request timeout"))
end

Private Instance Methods

find_executor(execution_plan_id) click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 186
def find_executor(execution_plan_id)
  execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name,
                                                 id: "execution-plan:#{execution_plan_id}").first
  if execution_lock
    execution_lock.world_id
  else
    Dispatcher::UnknownWorld
  end
rescue => e
  log(Logger::ERROR, e)
  Dispatcher::UnknownWorld
end
reset_tracked_request(tracked_request) click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 211
def reset_tracked_request(tracked_request)
  if tracked_request.finished.resolved?
    raise Dynflow::Error.new('Can not reset resolved tracked request')
  end
  unless tracked_request.accepted.resolved?
    tracked_request.accept! # otherwise nobody would set the accept future
  end
  future = Concurrent::Promises.resolvable_future
  @tracked_requests[tracked_request.id] = TrackedRequest[tracked_request.id, tracked_request.request, future, tracked_request.finished]
end
resolve_tracked_request(id, error = nil) click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 222
def resolve_tracked_request(id, error = nil)
  return unless @tracked_requests.key?(id)
  if error
    @tracked_requests.delete(id).fail! error
  else
    tracked_request = @tracked_requests[id]
    resolve_to = match tracked_request.request,
                       (on Execution.(execution_plan_id: ~any) do |uuid|
                          @world.persistence.load_execution_plan(uuid)
                        end),
                       (on Event | Ping do
                          true
                        end)
    @tracked_requests.delete(id).success! resolve_to
  end
end
track_request(finished, request, timeout) { |tracked_request| ... } click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 199
def track_request(finished, request, timeout)
  id_suffix = @last_id_suffix += 1
  id = "#{@world.id}-#{id_suffix}"
  tracked_request = TrackedRequest[id, request, Concurrent::Promises.resolvable_future, finished]
  @tracked_requests[id] = tracked_request
  @world.clock.ping(self, timeout, [:timeout, id]) if timeout
  yield tracked_request
rescue Dynflow::Error => e
  resolve_tracked_request(tracked_request.id, e)
  log(Logger::ERROR, e)
end
with_ping_request_caching(request, future) { || ... } click to toggle source

Tries to reduce the number of sent Ping requests by first looking into a cache. If the destination world is an executor world, the result is resolved solely from the cache. For client worlds the Ping might be sent if the cache record is stale.

@param request [Dynflow::Dispatcher::Request] the request to send @param future [Concurrent::Future] the future to fulfill if the world was seen recently @return [Concurrent::Future] the future tracking the request

# File lib/dynflow/dispatcher/client_dispatcher.rb, line 246
def with_ping_request_caching(request, future)
  return yield unless request.is_a?(Dynflow::Dispatcher::Ping)
  return yield unless request.use_cache

  if @ping_cache.fresh_record?(request.receiver_id)
    future.fulfill(true)
  else
    if @ping_cache.executor?(request.receiver_id)
      future.reject
    else
      yield
    end
  end
end