class Dynflow::Connectors::Database::Core
Attributes
polling_interval[R]
Public Class Methods
new(connector, polling_interval)
click to toggle source
# File lib/dynflow/connectors/database.rb, line 47 def initialize(connector, polling_interval) @connector = connector @world = nil @executor_round_robin = RoundRobin.new @stopped = false @polling_interval = polling_interval end
Public Instance Methods
check_inbox()
click to toggle source
# File lib/dynflow/connectors/database.rb, line 80 def check_inbox return unless @world receive_envelopes end
handle_envelope(envelope)
click to toggle source
# File lib/dynflow/connectors/database.rb, line 85 def handle_envelope(envelope) world_id = find_receiver(envelope) if world_id == @world.id if @stopped log(Logger::ERROR, "Envelope #{envelope} received for stopped world") else @connector.receive(@world, envelope) end else send_envelope(update_receiver_id(envelope, world_id)) end end
periodic_check_inbox()
click to toggle source
# File lib/dynflow/connectors/database.rb, line 75 def periodic_check_inbox self << :check_inbox @world.clock.ping(self, polling_interval, :periodic_check_inbox) unless @stopped end
start_listening(world)
click to toggle source
# File lib/dynflow/connectors/database.rb, line 59 def start_listening(world) @world = world @stopped = false postgres_listen_start self << :periodic_check_inbox end
stop_listening()
click to toggle source
# File lib/dynflow/connectors/database.rb, line 70 def stop_listening @stopped = true postgres_listen_stop end
stop_receiving_new_work()
click to toggle source
# File lib/dynflow/connectors/database.rb, line 66 def stop_receiving_new_work @world.coordinator.deactivate_world(@world.registered_world) end
stopped?()
click to toggle source
# File lib/dynflow/connectors/database.rb, line 55 def stopped? !!@stopped end
Private Instance Methods
any_executor()
click to toggle source
# File lib/dynflow/connectors/database.rb, line 140 def any_executor @executor_round_robin.data = @world.coordinator.find_worlds(true) @executor_round_robin.next or raise Dynflow::Error, "No executor available" end
find_receiver(envelope)
click to toggle source
# File lib/dynflow/connectors/database.rb, line 132 def find_receiver(envelope) if Dispatcher::AnyExecutor === envelope.receiver_id any_executor.id else envelope.receiver_id end end
postgres_listen_start()
click to toggle source
# File lib/dynflow/connectors/database.rb, line 100 def postgres_listen_start if PostgresListerner.notify_supported?(@world.persistence.adapter.db) @postgres_listener ||= PostgresListerner.new(self, @world.id, @world.persistence.adapter.db) @postgres_listener.start unless @postgres_listener.started? end end
postgres_listen_stop()
click to toggle source
# File lib/dynflow/connectors/database.rb, line 107 def postgres_listen_stop @postgres_listener.stop if @postgres_listener end
receive_envelopes()
click to toggle source
# File lib/dynflow/connectors/database.rb, line 111 def receive_envelopes @world.persistence.pull_envelopes(@world.id).each do |envelope| self.tell([:handle_envelope, envelope]) end rescue => e log(Logger::ERROR, "Receiving envelopes failed on #{e}") end
send_envelope(envelope)
click to toggle source
# File lib/dynflow/connectors/database.rb, line 119 def send_envelope(envelope) @world.persistence.push_envelope(envelope) if @postgres_listener @postgres_listener.notify(envelope.receiver_id) end rescue => e log(Logger::ERROR, "Sending envelope failed on #{e}") end
update_receiver_id(envelope, new_receiver_id)
click to toggle source
# File lib/dynflow/connectors/database.rb, line 128 def update_receiver_id(envelope, new_receiver_id) Dispatcher::Envelope[envelope.request_id, envelope.sender_id, new_receiver_id, envelope.message] end