class Proxy::RemoteExecution::Ssh::MQTT::DispatcherActor::Tracker
Public Class Methods
new(limit, clock)
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 37 def initialize(limit, clock) @clock = clock @limit = limit @jobs = {} @pending = [] @running = Set.new @hot = Set.new @cold = Set.new end
Public Instance Methods
dispatch_pending()
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 86 def dispatch_pending pending_count.times do mqtt_notify(@pending.first) @hot << @pending.shift end end
done(uuid)
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 65 def done(uuid) @jobs.delete(uuid) [@pending, @running, @hot, @cold].each do |source| source.delete(uuid) end dispatch_pending end
mqtt_notify(uuid)
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 101 def mqtt_notify(uuid) job = @jobs[uuid] return if job.nil? Proxy::RemoteExecution::Ssh::MQTT.publish(job.topic, JSON.dump(job.payload)) end
needs_processing?()
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 73 def needs_processing? pending_count.positive? || @hot.any? || @cold.any? end
new(uuid, topic, payload)
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 47 def new(uuid, topic, payload) @jobs[uuid] = JobDefinition.new(uuid, topic, payload) @pending << uuid dispatch_pending end
pending_count()
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 77 def pending_count pending = @pending.count return pending if @limit.nil? running = [@running, @hot, @cold].map(&:count).sum capacity = @limit - running pending > capacity ? capacity : pending end
process()
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 93 def process @cold.each { |uuid| schedule_resend(uuid) } @cold = @hot @hot = Set.new dispatch_pending end
resend(uuid)
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 58 def resend(uuid) return unless @jobs[uuid] @pending << uuid dispatch_pending end
resend_interval()
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 116 def resend_interval settings[:mqtt_resend_interval] end
running(uuid)
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 53 def running(uuid) [@pending, @hot, @cold].each { |source| source.delete(uuid) } @running << uuid end
schedule_resend(uuid)
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 112 def schedule_resend(uuid) @clock.ping(Proxy::RemoteExecution::Ssh::MQTT::Dispatcher.instance, resend_interval, uuid, :resend) end
settings()
click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 108 def settings Proxy::RemoteExecution::Ssh::Plugin.settings end