class Dynflow::Dispatcher::ClientDispatcher
Constants
- TrackedRequest
Attributes
ping_cache[R]
Public Class Methods
new(world, ping_cache_age)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 106 def initialize(world, ping_cache_age) @world = Type! world, World @last_id = 0 @tracked_requests = {} @terminated = nil @ping_cache = PingCache.new world, ping_cache_age end
Public Instance Methods
add_ping_cache_record(id)
click to toggle source
Records when was the world with provided id last seen using a PingCache
@param id [String] Id of the world @see Dynflow::Dispatcher::ClientDispatcher::PingCache#add_record
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 179 def add_ping_cache_record(id) log Logger::DEBUG, "adding ping cache record for #{id}" @ping_cache.add_record id end
dispatch_request(request, client_world_id, request_id)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 133 def dispatch_request(request, client_world_id, request_id) executor_id = match request, (on ~Execution do |execution| AnyExecutor end), (on ~Event do |event| find_executor(event.execution_plan_id) end), (on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _| receiver_id end) envelope = Envelope[request_id, client_world_id, executor_id, request] if Dispatcher::UnknownWorld === envelope.receiver_id raise Dynflow::Error, "Could not find an executor for #{envelope}" end connector.send(envelope).value! rescue => e log(Logger::ERROR, e) respond(envelope, Failed[e.message]) if envelope end
dispatch_response(envelope)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 154 def dispatch_response(envelope) return unless @tracked_requests.key?(envelope.request_id) match envelope.message, (on ~Accepted do @tracked_requests[envelope.request_id].accept! end), (on ~Failed do |msg| resolve_tracked_request(envelope.request_id, Dynflow::Error.new(msg.error)) end), (on Done do resolve_tracked_request(envelope.request_id) end), (on Pong do add_ping_cache_record(envelope.sender_id) resolve_tracked_request(envelope.request_id) end), (on ExecutionStatus.(~any) do |steps| @tracked_requests.delete(envelope.request_id).success! steps end) end
publish_request(future, request, timeout)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 114 def publish_request(future, request, timeout) with_ping_request_caching(request, future) do track_request(future, request, timeout) do |tracked_request| dispatch_request(request, @world.id, tracked_request.id) end end end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Actor#start_termination
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 126 def start_termination(*args) super @tracked_requests.values.each { |tracked_request| tracked_request.fail!(Dynflow::Error.new('Dispatcher terminated')) } @tracked_requests.clear finish_termination end
timeout(request_id)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 122 def timeout(request_id) resolve_tracked_request(request_id, Dynflow::Error.new("Request timeout")) end
Private Instance Methods
find_executor(execution_plan_id)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 186 def find_executor(execution_plan_id) execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name, id: "execution-plan:#{execution_plan_id}").first if execution_lock execution_lock.world_id else Dispatcher::UnknownWorld end rescue => e log(Logger::ERROR, e) Dispatcher::UnknownWorld end
reset_tracked_request(tracked_request)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 210 def reset_tracked_request(tracked_request) if tracked_request.finished.completed? raise Dynflow::Error.new('Can not reset resolved tracked request') end unless tracked_request.accepted.completed? tracked_request.accept! # otherwise nobody would set the accept future end @tracked_requests[tracked_request.id] = TrackedRequest[tracked_request.id, tracked_request.request, Concurrent.future, tracked_request.finished] end
resolve_tracked_request(id, error = nil)
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 220 def resolve_tracked_request(id, error = nil) return unless @tracked_requests.key?(id) if error @tracked_requests.delete(id).fail! error else tracked_request = @tracked_requests[id] resolve_to = match tracked_request.request, (on Execution.(execution_plan_id: ~any) do |uuid| @world.persistence.load_execution_plan(uuid) end), (on Event | Ping do true end) @tracked_requests.delete(id).success! resolve_to end end
track_request(finished, request, timeout) { |tracked_request| ... }
click to toggle source
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 199 def track_request(finished, request, timeout) id = @last_id += 1 tracked_request = TrackedRequest[id, request, Concurrent.future, finished] @tracked_requests[id] = tracked_request @world.clock.ping(self, timeout, [:timeout, id]) if timeout yield tracked_request rescue Dynflow::Error => e resolve_tracked_request(tracked_request.id, e) log(Logger::ERROR, e) end
with_ping_request_caching(request, future) { || ... }
click to toggle source
Tries to reduce the number of sent Ping requests by first looking into a cache. If the destination world is an executor world, the result is resolved solely from the cache. For client worlds the Ping might be sent if the cache record is stale.
@param request [Dynflow::Dispatcher::Request] the request to send @param future [Concurrent::Future] the future to fulfill if the world was seen recently @return [Concurrent::Future] the future tracking the request
# File lib/dynflow/dispatcher/client_dispatcher.rb, line 244 def with_ping_request_caching(request, future) return yield unless request.is_a?(Dynflow::Dispatcher::Ping) return yield unless request.use_cache if @ping_cache.fresh_record?(request.receiver_id) future.success(true) else if @ping_cache.executor?(request.receiver_id) future.fail else yield end end end