class Dynflow::Connectors::Database::Core

Attributes

polling_interval[R]

Public Class Methods

new(connector, polling_interval) click to toggle source
# File lib/dynflow/connectors/database.rb, line 48
def initialize(connector, polling_interval)
  @connector = connector
  @world = nil
  @executor_round_robin = RoundRobin.new
  @stopped = false
  @polling_interval = polling_interval
end

Public Instance Methods

check_inbox() click to toggle source
# File lib/dynflow/connectors/database.rb, line 81
def check_inbox
  return unless @world
  receive_envelopes
end
handle_envelope(envelope) click to toggle source
# File lib/dynflow/connectors/database.rb, line 86
def handle_envelope(envelope)
  world_id = find_receiver(envelope)
  if world_id == @world.id
    if @stopped
      log(Logger::ERROR, "Envelope #{envelope} received for stopped world")
    else
      @connector.receive(@world, envelope)
    end
  else
    send_envelope(update_receiver_id(envelope, world_id))
  end
end
periodic_check_inbox() click to toggle source
# File lib/dynflow/connectors/database.rb, line 76
def periodic_check_inbox
  self << :check_inbox
  @world.clock.ping(self, polling_interval, :periodic_check_inbox) unless @stopped
end
start_listening(world) click to toggle source
# File lib/dynflow/connectors/database.rb, line 60
def start_listening(world)
  @world = world
  @stopped = false
  postgres_listen_start
  self << :periodic_check_inbox
end
stop_listening() click to toggle source
# File lib/dynflow/connectors/database.rb, line 71
def stop_listening
  @stopped = true
  postgres_listen_stop
end
stop_receiving_new_work() click to toggle source
# File lib/dynflow/connectors/database.rb, line 67
def stop_receiving_new_work
  @world.coordinator.deactivate_world(@world.registered_world)
end
stopped?() click to toggle source
# File lib/dynflow/connectors/database.rb, line 56
def stopped?
  !!@stopped
end

Private Instance Methods

any_executor() click to toggle source
# File lib/dynflow/connectors/database.rb, line 141
def any_executor
  @executor_round_robin.data = @world.coordinator.find_worlds(true)
  @executor_round_robin.next or raise Dynflow::Error, "No executor available"
end
find_receiver(envelope) click to toggle source
# File lib/dynflow/connectors/database.rb, line 133
def find_receiver(envelope)
  if Dispatcher::AnyExecutor === envelope.receiver_id
    any_executor.id
  else
    envelope.receiver_id
  end
end
postgres_listen_start() click to toggle source
# File lib/dynflow/connectors/database.rb, line 101
def postgres_listen_start
  if PostgresListerner.notify_supported?(@world.persistence.adapter.db)
    @postgres_listener ||= PostgresListerner.new(self, @world.id, @world.persistence.adapter.db)
    @postgres_listener.start unless @postgres_listener.started?
  end
end
postgres_listen_stop() click to toggle source
# File lib/dynflow/connectors/database.rb, line 108
def postgres_listen_stop
  @postgres_listener.stop if @postgres_listener
end
receive_envelopes() click to toggle source
# File lib/dynflow/connectors/database.rb, line 112
def receive_envelopes
  @world.persistence.pull_envelopes(@world.id).each do |envelope|
    self.tell([:handle_envelope, envelope])
  end
rescue => e
  log(Logger::ERROR, "Receiving envelopes failed on #{e}")
end
send_envelope(envelope) click to toggle source
# File lib/dynflow/connectors/database.rb, line 120
def send_envelope(envelope)
  @world.persistence.push_envelope(envelope)
  if @postgres_listener
    @postgres_listener.notify(envelope.receiver_id)
  end
rescue => e
  log(Logger::ERROR, "Sending envelope failed on #{e}")
end
update_receiver_id(envelope, new_receiver_id) click to toggle source
# File lib/dynflow/connectors/database.rb, line 129
def update_receiver_id(envelope, new_receiver_id)
  Dispatcher::Envelope[envelope.request_id, envelope.sender_id, new_receiver_id, envelope.message]
end