class Stomp::Connection

Low level connection which maps commands and supports synchronous receives

Attributes

autoflush[RW]

Autoflush forces a flush on each transmit. This may be changed dynamically by calling code.

connection_frame[R]

The CONNECTED frame from the broker.

disconnect_receipt[R]

Any disconnect RECEIPT frame if requested.

hb_received[R]

Heartbeat receive has been on time.

hb_sent[R]

Heartbeat send has been successful.

host[R]

Currently-connected host and port

jruby[R]

JRuby detected

port[R]

Currently-connected host and port

protocol[R]

The Stomp Protocol version.

session[R]

A unique session ID, assigned by the broker.

Public Class Methods

default_port(ssl) click to toggle source

::default_port returns the default port used by the gem for TCP or SSL.

# File lib/stomp/connection.rb, line 44
def self.default_port(ssl)
  ssl ? 61612 : 61613
end
new(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) click to toggle source

A new Connection object can be initialized using two forms:

Hash (this is the recommended Connection initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  # These are the default parameters and do not need to be set
  :reliable => true,                  # reliable (use failover)
  :initial_reconnect_delay => 0.01,   # initial delay before reconnect (secs)
  :max_reconnect_delay => 30.0,       # max delay before reconnect
  :use_exponential_back_off => true,  # increase delay between reconnect attpempts
  :back_off_multiplier => 2,          # next delay multiplier
  :max_reconnect_attempts => 0,       # retry forever, use # for maximum attempts
  :randomize => false,                # do not radomize hosts hash before reconnect
  :connect_timeout => 0,              # Timeout for TCP/TLS connects, use # for max seconds
  :connect_headers => {},             # user supplied CONNECT headers (req'd for Stomp 1.1+)
  :parse_timeout => 5,                # IO::select wait time on socket reads
  :logger => nil,                     # user suplied callback logger instance
  :dmh => false,                      # do not support multihomed IPV4 / IPV6 hosts during failover
  :closed_check => true,              # check first if closed in each protocol method
  :hbser => false,                    # raise on heartbeat send exception
  :stompconn => false,                # Use STOMP instead of CONNECT
  :usecrlf => false,                  # Use CRLF command and header line ends (1.2+)
  :max_hbread_fails => 0,             # Max HB read fails before retry.  0 => never retry
  :max_hbrlck_fails => 0,             # Max HB read lock obtain fails before retry.  0 => never retry
  :fast_hbs_adjust => 0.0,            # Fast heartbeat senders sleep adjustment, seconds, needed ...
                                      # For fast heartbeat senders.  'fast' == YMMV.  If not
                                      # correct for your environment, expect unnecessary fail overs
  :connread_timeout => 0,             # Timeout during CONNECT for read of CONNECTED/ERROR, secs
  :tcp_nodelay => true,               # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm
  :start_timeout => 0,                # Timeout around Stomp::Client initialization
  :sslctx_newparm => nil,             # Param for SSLContext.new
  :ssl_post_conn_check => true,       # Further verify broker identity
  :nto_cmd_read => true,              # No timeout on COMMAND read
}

e.g. c = Stomp::Connection.new(hash)

Positional parameters:

login             (String,  default : '')
passcode          (String,  default : '')
host              (String,  default : 'localhost')
port              (Integer, default : 61613)
reliable          (Boolean, default : false)
reconnect_delay   (Integer, default : 5)

e.g. c = Stomp::Connection.new("username", "password", "localhost", 61613, true)
# File lib/stomp/connection.rb, line 112
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  @protocol = Stomp::SPL_10 # Assumed at first
  @hb_received = true       # Assumed at first
  @hb_sent = true           # Assumed at first
  @hbs = @hbr = false       # Sending/Receiving heartbeats. Assume no for now.
  @jruby = false            # Assumed at first
  # Initialize some variables
  @closed, @socket, @hhas10, @rt, @st = true, nil, false, nil, nil
  if defined?(RUBY_ENGINE) && RUBY_ENGINE =~ /jruby/
    @jruby = true
  end

  ct = Thread.current
  if ct.respond_to?(:report_on_exception=)
    ct.report_on_exception=false
  end

  if login.is_a?(Hash)
    hashed_initialize(login)
  else
    @host = host
    @port = port
    @login = login
    @passcode = passcode
    @reliable = reliable
    @reconnect_delay = reconnect_delay
    @connect_headers = connect_headers
    @ssl = false
    @parameters = nil
    @parse_timeout = 5              # To override, use hashed parameters
    @connect_timeout = 0    # To override, use hashed parameters
    @logger = Stomp::NullLogger.new # To override, use hashed parameters
    @autoflush = false    # To override, use hashed parameters or setter
    @closed_check = true  # Run closed check in each protocol method
    @hbser = false        # Raise if heartbeat send exception
    @stompconn = false    # If true, use STOMP rather than CONNECT
    @usecrlf = false      # If true, use \r\n as line ends (1.2 only)
    @max_hbread_fails = 0 # 0 means never retry for HB read failures
    @max_hbrlck_fails = 0 # 0 means never retry for HB read lock failures
    @fast_hbs_adjust = 0.0 # Fast heartbeat senders sleep adjustment
    @connread_timeout = 0 # Connect read CONNECTED/ERROR timeout
    @tcp_nodelay = true # Disable Nagle
    @start_timeout = 0 # Client only, startup timeout
    @sslctx_newparm = nil # SSLContext.new paramater
    @ssl_post_conn_check = true # Additional broker verification
    @nto_cmd_read = true # No timeout on COMMAND read
    warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\//
  end

  # Use Mutexes:  only one lock per each thread.
  # Reverted to original implementation attempt using Mutex.
  @transmit_semaphore = Mutex.new
  @read_semaphore = Mutex.new
  @socket_semaphore = Mutex.new
  @gets_semaphore = Mutex.new

  @subscriptions = {}
  @failure = nil
  @connection_attempts = 0

  socket
end
open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) click to toggle source

open is syntactic sugar for 'Connection.new', see 'initialize' for usage.

# File lib/stomp/connection.rb, line 210
def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers)
end
ssl_v2xoptions() click to toggle source

SSL Helper

# File lib/stomp/connection.rb, line 49
def self.ssl_v2xoptions()
    require 'openssl' unless defined?(OpenSSL)
    # Mimic code in later versions of Ruby 2.x (and backported to later
    # versions of 1.9.3).
    opts = OpenSSL::SSL::OP_ALL
    opts &= ~OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS if defined?(OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS)
    opts |= OpenSSL::SSL::OP_NO_COMPRESSION if defined?(OpenSSL::SSL::OP_NO_COMPRESSION)
    opts |= OpenSSL::SSL::OP_NO_SSLv2 if defined?(OpenSSL::SSL::OP_NO_SSLv2)
    opts |= OpenSSL::SSL::OP_NO_SSLv3 if defined?(OpenSSL::SSL::OP_NO_SSLv3)
end

Public Instance Methods

_interruptible_gets(read_socket) click to toggle source
# File lib/connection/netio.rb, line 11
def _interruptible_gets(read_socket)
  # The gets thread may be interrupted by the heartbeat thread. Ensure that
  # if so interrupted, a new gets cannot start until after the heartbeat
  # thread finishes its work. This is PURELY to avoid a segfault bug
  # involving OpenSSL::Buffer.
  @gets_semaphore.synchronize { @getst = Thread.current }
  read_socket.gets
ensure
  @gets_semaphore.synchronize { @getst = nil }
end
abort(name, headers = {}) click to toggle source

Abort aborts a transaction by name.

# File lib/stomp/connection.rb, line 334
def abort(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  slog(:on_abort, log_params, headers)
  transmit(Stomp::CMD_ABORT, headers)
end
ack(message_or_ack_id, headers = {}) click to toggle source

Acknowledge a message, used when a subscription has specified client acknowledgement e.g.:

connection.subscribe("/queue/a", :ack => 'client')

connection.subscribe("/queue/a", :ack => 'client-individual')

as appropriate for the protocol level.

Accepts an optional transaction header ( :transaction => 'some_transaction_id' ).

When the connection protocol level is 1.0 or 1.1 the message_or_ack_id parameter should match the message-id header of the MESSAGE being acknowledged e.g.:

connection.ack(message.headers['message-id'])

When the connection protocol level is 1.2 the message_or_ack_id parameter should match the ack header of the MESSAGE being acknowledged e.g.:

connection.ack(message.headers['ack'])

In summary, the behavior is protocol level dependent, see the specifications and comments in the code.

# File lib/stomp/connection.rb, line 259
def ack(message_or_ack_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  raise Stomp::Error::MessageIDRequiredError if message_or_ack_id.nil? || message_or_ack_id == ""
  headers = headers.symbolize_keys

  case @protocol
    when Stomp::SPL_12
      # The ACK frame MUST include an "id" header matching the "ack" header
      # of the MESSAGE being acknowledged.
      headers[:id] = message_or_ack_id
    when Stomp::SPL_11
      # ACK has two REQUIRED headers: "message-id", which MUST contain a value
      # matching the message-id header of the MESSAGE being acknowledged and
      # "subscription", which MUST be set to match the value of SUBSCRIBE's
      # id header.
      headers[:'message-id'] = message_or_ack_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
    else # Stomp::SPL_10
      # ACK has one required header, "message-id", which must contain a value
      # matching the message-id for the MESSAGE being acknowledged.
      headers[:'message-id'] = message_or_ack_id
  end
  _headerCheck(headers)
  slog(:on_ack, log_params, headers)
  transmit(Stomp::CMD_ACK, headers)
end
begin(name, headers = {}) click to toggle source

Begin starts a transaction, and requires a name for the transaction

# File lib/stomp/connection.rb, line 225
def begin(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  slog(:on_begin, log_params, headers)
  transmit(Stomp::CMD_BEGIN, headers)
end
client_ack?(message) click to toggle source

client_ack? determines if headers contain :ack => “client”.

# File lib/stomp/connection.rb, line 452
def client_ack?(message)
  headers = @subscriptions[message.headers[:destination]]
  !headers.nil? && headers[:ack] == "client"
end
closed?() click to toggle source

closed? tests if this connection is closed.

# File lib/stomp/connection.rb, line 220
def closed?
  @closed
end
commit(name, headers = {}) click to toggle source

Commit commits a transaction by name.

# File lib/stomp/connection.rb, line 322
def commit(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  slog(:on_commit, log_params, headers)
  transmit(Stomp::CMD_COMMIT, headers)
end
disconnect(headers = {}) click to toggle source

disconnect closes this connection. If requested, a disconnect RECEIPT will be received.

# File lib/stomp/connection.rb, line 459
def disconnect(headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  _headerCheck(headers)
  if @protocol >= Stomp::SPL_11
    @st.kill if @st # Kill ticker thread if any
    @rt.kill if @rt # Kill ticker thread if any
  end
  transmit(Stomp::CMD_DISCONNECT, headers)
  @disconnect_receipt = receive if headers[:receipt]
  slog(:on_disconnect, log_params)
  close_socket
end
hashed_initialize(params) click to toggle source

#hashed_initialize prepares a new connection with a Hash of initialization parameters.

# File lib/stomp/connection.rb, line 177
def hashed_initialize(params)
  lp = _hdup(params)
  @parameters = refine_params(lp)
  @reliable =  @parameters[:reliable]
  @reconnect_delay = @parameters[:initial_reconnect_delay]
  @connect_headers = @parameters[:connect_headers]
  @parse_timeout =  @parameters[:parse_timeout]
  @connect_timeout =  @parameters[:connect_timeout]
  @logger = @parameters[:logger] || Stomp::NullLogger.new
  @autoflush = @parameters[:autoflush]
  @closed_check = @parameters[:closed_check]
  @hbser = @parameters[:hbser]
  @stompconn = @parameters[:stompconn]
  @usecrlf = @parameters[:usecrlf]
  @max_hbread_fails = @parameters[:max_hbread_fails]
  @max_hbrlck_fails = @parameters[:max_hbrlck_fails]
  @fast_hbs_adjust = @parameters[:fast_hbs_adjust]
  @connread_timeout = @parameters[:connread_timeout]
  @sslctx_newparm = @parameters[:sslctx_newparm]
  @ssl_post_conn_check = @parameters[:ssl_post_conn_check]
  @nto_cmd_read = @parameters[:nto_cmd_read]
  #
  # Try to support Ruby 1.9.x and 2.x ssl.
  unless defined?(RSpec)
    @parameters[:hosts].each do |ah|
      ah[:ssl] = Stomp::SSLParams.new if ah[:ssl] == true
    end
  end
  #sets the first host to connect
  change_host
end
hbrecv_count() click to toggle source

#hbrecv_count returns the current connection's heartbeat receive count.

# File lib/stomp/connection.rb, line 573
def hbrecv_count()
  return 0 unless @hbrecv_count
  @hbrecv_count
end
hbrecv_interval() click to toggle source

#hbrecv_interval returns the connection's heartbeat receive interval.

# File lib/stomp/connection.rb, line 561
def hbrecv_interval()
  return 0 unless @hbrecv_interval
  @hbrecv_interval / 1000.0 # ms
end
hbsend_count() click to toggle source

#hbsend_count returns the current connection's heartbeat send count.

# File lib/stomp/connection.rb, line 567
def hbsend_count()
  return 0 unless @hbsend_count
  @hbsend_count
end
hbsend_interval() click to toggle source

#hbsend_interval returns the connection's heartbeat send interval.

# File lib/stomp/connection.rb, line 555
def hbsend_interval()
  return 0 unless @hbsend_interval
  @hbsend_interval / 1000.0 # ms
end
nack(message_or_ack_id, headers = {}) click to toggle source

STOMP 1.1+ NACK.

When the connection protocol level is 1.1 the message_or_ack_id parameter should match the message-id header of the MESSAGE being acknowledged.

When the connection protocol level is 1.2 the message_or_ack_id parameter should match the ack header of the MESSAGE being acknowledged.

Behavior is protocol level dependent, see the specifications and comments below.

# File lib/stomp/connection.rb, line 297
def nack(message_or_ack_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::UnsupportedProtocolError if @protocol == Stomp::SPL_10
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::MessageIDRequiredError if message_or_ack_id.nil? || message_or_ack_id == ""
  headers = headers.symbolize_keys
  case @protocol
    when Stomp::SPL_12
      # The NACK frame MUST include an id header matching the ack header
      # of the MESSAGE being acknowledged.
      headers[:id] = message_or_ack_id
    else # Stomp::SPL_11 only
      # NACK has two REQUIRED headers: message-id, which MUST contain a value
      # matching the message-id for the MESSAGE being acknowledged and
      # subscription, which MUST be set to match the value of the subscription's
      # id header.
      headers[:'message-id'] = message_or_ack_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
  end
  _headerCheck(headers)
  slog(:on_nack, log_params, headers)
  transmit(Stomp::CMD_NACK, headers)
end
open?() click to toggle source

open? tests if this connection is open.

# File lib/stomp/connection.rb, line 215
def open?
  !@closed
end
poll() click to toggle source

poll returns a pending message if one is available, otherwise returns nil.

# File lib/stomp/connection.rb, line 477
def poll()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  # No need for a read lock here.  The receive method eventually fulfills
  # that requirement.
  return nil if @socket.nil? || !@socket.ready?
  receive()
end
publish(destination, message, headers = {}) click to toggle source

Publish message to destination. To disable content length header use header ( :suppress_content_length => true ). Accepts a transaction header ( :transaction => 'some_transaction_id' ).

# File lib/stomp/connection.rb, line 398
def publish(destination, message, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  headers[:destination] = destination
  _headerCheck(headers)
  slog(:on_publish, log_params, message, headers)
  transmit(Stomp::CMD_SEND, headers, message)
end
receive() click to toggle source

receive returns the next Message off of the wire. this can return nil in cases where:

  • the broker has closed the connection

  • the connection is not reliable

# File lib/stomp/connection.rb, line 489
def receive()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  super_result = __old_receive()
  if super_result.nil? && @reliable && !closed?
    errstr = "connection.receive returning EOF as nil - resetting connection.\n"
    @failure = nil
    unless slog(:on_miscerr, log_params, "es_recv: " + errstr)
      $stderr.print errstr
    end

    # !!! This initiates a re-connect !!!
    # The call to __old_receive() will in turn call socket().  Before
    # that we should change the target host, otherwise the host that
    # just failed may be attempted first.
    _reconn_prep()
    #
    super_result = __old_receive()
  end
  #
  if super_result.nil? && !@reliable
    @st.kill if @st # Kill ticker thread if any
    @rt.kill if @rt # Kill ticker thread if any
    close_socket()
    @closed = true
    warn 'warning: broker sent EOF, and connection not reliable' unless defined?(Test)
  end
  slog(:on_receive, log_params, super_result)
  return super_result
end
set_logger(logger) click to toggle source

#set_logger selects a new callback logger instance.

# File lib/stomp/connection.rb, line 520
def set_logger(logger)
  @logger = logger
end
sha1(data) click to toggle source

sha1 returns a SHA1 digest for arbitrary string data.

# File lib/stomp/connection.rb, line 536
def sha1(data)
  Digest::SHA1.hexdigest(data)
end
slog(name, *parms) click to toggle source

log call router

# File lib/stomp/connection.rb, line 579
def slog(name, *parms)
  return false unless @logger
  @logger.send(name, *parms) if @logger.respond_to?(:"#{name}")
  @logger.respond_to?(:"#{name}")
end
subscribe(destination, headers = {}, subId = nil) click to toggle source

Subscribe subscribes to a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.

# File lib/stomp/connection.rb, line 347
def subscribe(destination, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  headers[:destination] = destination
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
    headers[:id] = subId if headers[:id].nil?
  end
  _headerCheck(headers)
  slog(:on_subscribe, log_params, headers)

  ## p [ "subId", subId ]
  ## p [ "subscriptions", @subscriptions ]
  # Store the subscription so that we can replay if we reconnect.
  if @reliable
    subId = destination if subId.nil?
    raise Stomp::Error::DuplicateSubscription if @subscriptions[subId]
    @subscriptions[subId] = headers
  end

  transmit(Stomp::CMD_SUBSCRIBE, headers)
end
unreceive(message, options = {}) click to toggle source

Send a message back to the source or to the dead letter queue. Accepts a dead letter queue option ( :dead_letter_queue => “/queue/DLQ” ). Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ). Accepts a force client acknowledgement option (:force_client_ack => true).

# File lib/stomp/connection.rb, line 414
def unreceive(message, options = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge(options)
  # Lets make sure all keys are symbols
  message.headers = message.headers.symbolize_keys
  retry_count = message.headers[:retry_count].to_i || 0
  message.headers[:retry_count] = retry_count + 1
  transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
  message_id = message.headers.delete(:'message-id')

  # Prevent duplicate 'subscription' headers on subsequent receives
  message.headers.delete(:subscription) if message.headers[:subscription]

  begin
    self.begin transaction_id

    if client_ack?(message) || options[:force_client_ack]
      self.ack(message_id, :transaction => transaction_id)
    end

    if message.headers[:retry_count] <= options[:max_redeliveries]
      self.publish(message.headers[:destination], message.body,
        message.headers.merge(:transaction => transaction_id))
    else
      # Poison ack, sending the message to the DLQ
      self.publish(options[:dead_letter_queue], message.body,
        message.headers.merge(:transaction => transaction_id,
        :original_destination => message.headers[:destination],
        :persistent => true))
    end
    self.commit transaction_id
  rescue Exception => exception
    self.abort transaction_id
    raise exception
  end
end
unsubscribe(destination, headers = {}, subId = nil) click to toggle source

Unsubscribe from a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.

# File lib/stomp/connection.rb, line 375
def unsubscribe(destination, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  headers[:destination] = destination
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
    headers[:id] = subId unless headers[:id]
  end
  _headerCheck(headers)
  slog(:on_unsubscribe, log_params, headers)
  transmit(Stomp::CMD_UNSUBSCRIBE, headers)
  if @reliable
    subId = destination if subId.nil?
    @subscriptions.delete(subId)
  end
end
uuid() click to toggle source

uuid returns a type 4 UUID.

# File lib/stomp/connection.rb, line 541
def uuid()
  b = []
  0.upto(15) do |i|
    b << rand(255)
  end
  b[6] = (b[6] & 0x0F) | 0x40
  b[8] = (b[8] & 0xbf) | 0x80
  #             0  1  2  3   4   5  6  7   8  9  10 11 12 13 14 15
  rs = sprintf("%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x%02x%02x",
  b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15])
  rs
end
valid_utf8?(s) click to toggle source

valid_utf8? returns an indicator if the given string is a valid UTF8 string.

# File lib/stomp/connection.rb, line 525
def valid_utf8?(s)
  case RUBY_VERSION
  when /1\.8/
    rv = _valid_utf8?(s)
  else
    rv = s.encoding.name != Stomp::UTF8 ? false : s.valid_encoding?
  end
  rv
end

Private Instance Methods

__old_receive() click to toggle source

#__old_receive receives a frame, blocks until the frame is received.

# File lib/connection/utils.rb, line 244
def __old_receive()
  # The receive may fail so we may need to retry.
  while true
    begin
      used_socket = socket()

      connread = false
      noiosel = (@ssl || @jruby) ? true : false
      return _receive(used_socket, connread, noiosel)
    rescue Stomp::Error::MaxReconnectAttempts
      @failure = $!
      unless slog(:on_miscerr, log_params, "Reached MaxReconnectAttempts")
        $stderr.print "Reached MaxReconnectAttempts\n"
      end
      raise
    rescue
      @failure = $!
      raise unless @reliable
      errstr = "receive failed: #{$!}"
      unless slog(:on_miscerr, log_params, "es1_oldrecv: " + errstr)
        $stderr.print "\non_miscerr\n"
        $stderr.print log_params.inspect
        $stderr.print "\n"
        $stderr.print "es2_oldrecv: " + errstr
        $stderr.print "\n"
      end

      # !!! This initiates a re-connect !!!
      _reconn_prep()
    end
  end
end
_decodeHeaders(h) click to toggle source

decode returns a Hash of decoded headers per the Stomp 1.1 specification.

# File lib/connection/utf8.rb, line 265
def _decodeHeaders(h)
  dh = {}
  h.each_pair do |k,v|
    # Keys here are NOT! symbolized
    if v.is_a?(Array)
      kdec = Stomp::HeaderCodec::decode(k)
      dh[kdec] = []
      v.each do |e|
        dh[kdec] << Stomp::HeaderCodec::decode(e)
      end
    else
      vs = v.to_s
      dh[Stomp::HeaderCodec::decode(k)] = Stomp::HeaderCodec::decode(vs)
    end
  end
  dh
end
_dump_callstack() click to toggle source

used for debugging

# File lib/connection/netio.rb, line 606
def _dump_callstack()
  i = 0
  caller.each do |c|
    p [ "csn", i, c ]
    i += 1
  end
end
_dump_ctx(ctx) click to toggle source

Used for debugging

# File lib/connection/netio.rb, line 600
def _dump_ctx(ctx)
  p [ "dc01", ctx.inspect ]
  p [ "dc02ciphers", ctx.ciphers ]
end
_dump_threads() click to toggle source

used for debugging

# File lib/connection/netio.rb, line 615
def _dump_threads()
  tl = Thread::list
  tl.each do |at|
    p [ "THDMPN", at ]
  end
  p [ "THDMPMain", @parameters[:client_main] ]
end
_encodeHeaders(h) click to toggle source

encode returns a Hash of encoded headers per the Stomp 1.1 specification.

# File lib/connection/utf8.rb, line 245
def _encodeHeaders(h)
  eh = {}
  h.each_pair do |k,v|
    # Keys are symbolized
    ks = k.to_s
    if v.is_a?(Array)
      kenc = Stomp::HeaderCodec::encode(ks)
      eh[kenc] = []
      v.each do |e|
        eh[kenc] << Stomp::HeaderCodec::encode(e)
      end
    else
      vs = v.to_s
      eh[Stomp::HeaderCodec::encode(ks).to_sym] = Stomp::HeaderCodec::encode(vs)
    end
  end
  eh
end
_expand_hosts(hash) click to toggle source

Support multi-homed servers.

# File lib/connection/utils.rb, line 14
def _expand_hosts(hash)
  new_hash = hash.clone
  new_hash[:hosts_cloned] = hash[:hosts].clone
  new_hash[:hosts] = []
  #
  hash[:hosts].each do |host_parms|
    ai = Socket.getaddrinfo(host_parms[:host], nil, nil, Socket::SOCK_STREAM)
    next if ai.nil? || ai.size == 0
    info6 = ai.detect {|info| info[4] == Socket::AF_INET6}
    info4 = ai.detect {|info| info[4] == Socket::AF_INET}
    if info6
      new_hostp = host_parms.clone
      new_hostp[:host] = info6[3]
      new_hash[:hosts] << new_hostp
    end
    if info4
      new_hostp = host_parms.clone
      new_hostp[:host] = info4[3]
      new_hash[:hosts] << new_hostp
    end
  end
  return new_hash
end
_hdup(h) click to toggle source

Duplicate parameters hash

# File lib/connection/utils.rb, line 216
def _hdup(h)
  ldup = {}
  ldup.merge!(h)
  ldup[:hosts] = []
  hvals = h[:hosts].nil? ? h["hosts"] : h[:hosts]
  hvals.each do |hv|
    ldup[:hosts] << hv.dup
  end
  ldup
end
_headerCheck(h) click to toggle source

Stomp 1.1+ header check for UTF8 validity. Raises Stomp::Error::UTF8ValidationError if header data is not valid UTF8.

# File lib/connection/utf8.rb, line 221
def _headerCheck(h)
  return if @protocol == Stomp::SPL_10 # Do nothing for this environment
  #
  h.each_pair do |k,v|
    # Keys here are symbolized
    ks = k.to_s
    ks.force_encoding(Stomp::UTF8) if ks.respond_to?(:force_encoding)
    raise Stomp::Error::UTF8ValidationError unless valid_utf8?(ks)
    #
    if v.is_a?(Array)
      v.each do |e|
        e.force_encoding(Stomp::UTF8) if e.respond_to?(:force_encoding)
        raise Stomp::Error::UTF8ValidationError unless valid_utf8?(e)
      end
    else
      vs = v.to_s + "" # Values are usually Strings, but could be TrueClass or Symbol
      # The + "" above forces an 'unfreeze' if necessary
      vs.force_encoding(Stomp::UTF8) if vs.respond_to?(:force_encoding)
      raise Stomp::Error::UTF8ValidationError unless valid_utf8?(vs)
    end
  end
end
_init_heartbeats() click to toggle source
# File lib/connection/heartbeats.rb, line 21
def _init_heartbeats()
  return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats.  OK.

  # Init.

  #
  @cx = @cy = @sx = @sy = 0   # Variable names as in spec

  #
  @hbsend_interval = @hbrecv_interval = 0.0 # Send/Receive ticker interval.

  #
  @hbsend_count = @hbrecv_count = 0 # Send/Receive ticker counts.

  #
  @ls = @lr = -1.0 # Last send/receive time (from Time.now.to_f)

  #
  @st = @rt = nil # Send/receive ticker thread

  # Handle current client / server capabilities.

  #
  cfh = @connection_frame.headers.symbolize_keys
  return if cfh[:"heart-beat"] == "0,0" # Server does not want heartbeats

  # Conect header parts.
  parts = @connect_headers[:"heart-beat"].split(",")
  @cx = parts[0].to_i
  @cy = parts[1].to_i

  # Connected frame header parts.
  parts = cfh[:"heart-beat"].split(",")
  @sx = parts[0].to_i
  @sy = parts[1].to_i

  # Catch odd situations like server has used => heart-beat:000,00000
  return if (@cx == 0 && @cy == 0) || (@sx == 0 && @sy == 0)

  # See if we are doing anything at all.

  @hbs = @hbr = true # Sending/Receiving heartbeats. Assume yes at first.
  # Check if sending is possible.
  @hbs = false if @cx == 0 || @sy == 0  # Reset if neither side wants
  # Check if receiving is possible.
  @hbr = false if @sx == 0 || @cy == 0  # Reset if neither side wants

  # Check if we should not do heartbeats at all
  return if (!@hbs && !@hbr)

  # If sending
  if @hbs
    sm = @cx >= @sy ? @cx : @sy     # ticker interval, ms
    @hbsend_interval = 1000.0 * sm  # ticker interval, μs
    @ls = Time.now.to_f             # best guess at start
    _start_send_ticker()
  end

  # If receiving
  if @hbr
    rm = @sx >= @cy ? @sx : @cy     # ticker interval, ms
    @hbrecv_interval = 1000.0 * rm  # ticker interval, μs
    @lr = Time.now.to_f             # best guess at start
    _start_receive_ticker()
  end

end
_init_line_read(read_socket) click to toggle source
# File lib/connection/netio.rb, line 564
def _init_line_read(read_socket)
    line = ''
    if @protocol == Stomp::SPL_10 || (@protocol >= Stomp::SPL_11 && !@hbr)
      if @jruby
        # Handle JRuby specific behavior.
        #p [ "ilrjr00", _is_ready?(read_socket), RUBY_VERSION ]
        if RUBY_VERSION <  "2"
          while true
            #p [ "ilrjr01A1", _is_ready?(read_socket) ]
            line = _interruptible_gets(read_socket) # Data from wire
            break unless line == "\n"
            line = ''
          end
        else # RUBY_VERSION >= "2"
          while _is_ready?(read_socket)
            #p [ "ilrjr01B2", _is_ready?(read_socket) ]
            line = _interruptible_gets(read_socket) # Data from wire
            break unless line == "\n"
            line = ''
          end
        end
      else
        line = _interruptible_gets(read_socket) # The old way
      end
    else # We are >= 1.1 *AND* receiving heartbeats.
      while true
        line = _interruptible_gets(read_socket) # Data from wire
        break unless line == "\n"
        line = ''
        @lr = Time.now.to_f
      end
    end
    line
end
_is_ready?(s) click to toggle source

This is a total hack, to try and guess how JRuby will behave today.

# File lib/connection/netio.rb, line 171
def _is_ready?(s)
  rdy = s.ready?
  #p [ "isr?", rdy ]
  return rdy unless @jruby
  #p [ "jrdychk", rdy.class ]
  if rdy.class == NilClass
    # rdy = true
    rdy = false # A test
  else
    rdy = (rdy.class == Fixnum || rdy.class == TrueClass) ? true : false
  end
  #p [ "isr?_last", rdy ]
  rdy
end
_normalize_line_end(line) click to toggle source

Normalize line ends because 1.2+ brokers can send 'mixed mode' headers, i.e.:

  • Some headers end with 'n'

  • Other headers end with 'rn'

# File lib/connection/netio.rb, line 190
def _normalize_line_end(line)
  return line unless @usecrlf
  # p [ "nleln", line ]
  line_len = line.respond_to?(:bytesize) ? line.bytesize : line.length
  last2 = line[line_len-2...line_len]
  # p [ "nlel2", last2 ]
  return line unless last2 == "\r\n"
  return line[0...line_len-2] + "\n"
end
_post_connect() click to toggle source

#_post_connect handles low level logic just after a physical connect.

# File lib/connection/utils.rb, line 85
def _post_connect()
  return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0
  if @connection_frame.command == Stomp::CMD_ERROR
    @connection_frame.headers = _decodeHeaders(@connection_frame.headers)
    return
  end
  # We are CONNECTed
  cfh = @connection_frame.headers.symbolize_keys
  @protocol = cfh[:version]
  if @protocol
    # Should not happen, but check anyway
    raise Stomp::Error::UnsupportedProtocolError unless Stomp::SUPPORTED.index(@protocol)
  else # CONNECTed to a 1.0 server that does not return *any* 1.1 type headers
    @protocol = Stomp::SPL_10 # reset
    return
  end
  # Heartbeats
  return unless @connect_headers[:"heart-beat"]
  _init_heartbeats()
end
_pre_connect() click to toggle source

#_pre_connect handles low level logic just prior to a physical connect.

# File lib/connection/utils.rb, line 62
def _pre_connect()
  @connect_headers = @connect_headers.symbolize_keys
  raise Stomp::Error::ProtocolErrorConnect if (@connect_headers[:"accept-version"] && !@connect_headers[:host])
  raise Stomp::Error::ProtocolErrorConnect if (!@connect_headers[:"accept-version"] && @connect_headers[:host])
  return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0
  # Try 1.1 or greater
  @hhas10 = false
  okvers = []
  avers = @connect_headers[:"accept-version"].split(",")
  avers.each do |nver|
    if Stomp::SUPPORTED.index(nver)
      okvers << nver
      @hhas10 = true if nver == Stomp::SPL_10
    end
  end
  raise Stomp::Error::UnsupportedProtocolError if okvers == []
  @connect_headers[:"accept-version"] = okvers.join(",") # This goes to server
  # Heartbeats - pre connect
  return unless @connect_headers[:"heart-beat"]
  _validate_hbheader()
end
_receive(read_socket, connread = false, noiosel = false) click to toggle source

Really read from the wire.

# File lib/connection/netio.rb, line 25
def _receive(read_socket, connread = false, noiosel = false)

  # p [ "ioscheck", @iosto, connread, noiosel, @nto_cmd_read ]
  # _dump_callstack()
  drdbg = ENV['DRDBG'] ? true : false

  @read_semaphore.synchronize do
    p [ "_receive_lock", Thread::current() ] if drdbg
    line = nil

    # =====
    # Read COMMAND (frame name)
    # =====
    if connread
      begin
        Timeout::timeout(@connread_timeout, Stomp::Error::ConnectReadTimeout) do
          line = _init_line_read(read_socket)
        end
      rescue Stomp::Error::ConnectReadTimeout => ex
        if @reliable
          _reconn_prep()
        end
        raise ex
      end
    else
      p [ "_receive_COMMAND" ] if drdbg
      _dump_callstack() if drdbg
      line = _init_line_read(read_socket)
    end
    #
    p [ "_receive_nilcheck", line.nil? ] if drdbg
    return nil if line.nil?
    #An extra \n at the beginning of the frame, possibly not caught by is_ready?
    line = '' if line == "\n"
    if line == HAND_SHAKE_DATA
      raise Stomp::Error::HandShakeDetectedError
    end

    p [ "_receive_norm_lend", line, Time.now ] if drdbg
    line = _normalize_line_end(line) if @protocol >= Stomp::SPL_12

    # =====
    # Read Headers (if any)
    # =====
    # Reads the headers until it runs into a empty line
    p [ "_receive_start_headers", line, Time.now ] if drdbg
    message_header = ''
    begin
      message_header += line
      unless connread || noiosel || @nto_cmd_read
        raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto)
      end
      p [ "_receive_next_header", line, Time.now ] if drdbg
      line = _interruptible_gets(read_socket)
      p [ "_receive_normle_header", line ] if drdbg
      raise  Stomp::Error::StompServerError if line.nil?
      line = _normalize_line_end(line) if @protocol >= Stomp::SPL_12
    end until line =~ /^\s?\n$/
    p [ "_receive_end_headers" ] if drdbg
    # Checks if it includes content_length header
    content_length = message_header.match(/content-length\s?:\s?(\d+)\s?\n/)
    message_body = ''

    # =====
    # Read message body (if any)
    # =====
    p [ "_receive_start_body", content_length ] if drdbg
    # If content_length is present, read the specified amount of bytes
    if content_length
      unless connread || noiosel
        raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto)
      end
      p [ "_receive_have_content_length" ] if drdbg
      message_body = read_socket.read content_length[1].to_i
      unless connread || noiosel
        raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto)
      end
      raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0"
      # Else read the rest of the message until the first \0
    else
      unless connread || noiosel
        raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto)
      end
      p [ "no_content_length" ] if drdbg
      message_body = read_socket.readline("\0")
      message_body.chop!
    end

    # =====
    # If the buffer isn't empty, reads/drains trailing new lines.
    #
    # Note: experiments with JRuby seem to show that socket.ready? never
    # returns true.  It appears that in cases where Ruby returns true
    # that JRuby returns a Fixnum.  We attempt to adjust for this
    # in the _is_ready? method.
    #
    # Note 2: the draining of new lines must be done _after_ a message
    # is read.  Do _not_ leave them on the wire and attempt to drain them
    # at the start of the next read.  Attempting to do that breaks the
    # asynchronous nature of the 'poll' method.
    # =====
    p [ "_receive_start_drain_loop", "isr", _is_ready?(read_socket) ] if drdbg
    while _is_ready?(read_socket)
      unless connread || noiosel
        raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto)
      end
      p [ "_receive_next_drain" ] if drdbg
      last_char = read_socket.getc
      break unless last_char
      if parse_char(last_char) != "\n"
        read_socket.ungetc(last_char)
        break
      end
    end

    # =====
    # Complete receive processing
    # =====
    p [ "_receive_hb_update" ] if drdbg
    if @protocol >= Stomp::SPL_11
      @lr = Time.now.to_f if @hbr
    end
    # Adds the excluded \n and \0 and tries to create a new message with it
    p [ "_receive_new_message" ] if drdbg
    msg = Message.new(message_header + "\n" + message_body + "\0", @protocol >= Stomp::SPL_11)
    p [ "_receive_decode_headers", msg.command, msg.headers ] if drdbg
    # Check for a valid frame name from the server.
    p [ "_receive_frame_name_check", msg.command ] if drdbg
    unless  SERVER_FRAMES[msg.command]
      sfex = Stomp::Error::ServerFrameNameError.new(msg.command)
      raise sfex
    end
    #
    # Always decode headers, even for 1.0. Issue #160.
    if msg.command != Stomp::CMD_CONNECTED
      msg.headers = _decodeHeaders(msg.headers)
    end
    p [ "_receive_ends", msg.command, msg.headers ] if drdbg
    p [ "_receive_UNlock", Thread::current() ] if drdbg
    msg
  end
end
_reconn_prep() click to toggle source

#_reconn_prep prepares for a reconnect retry

# File lib/connection/utils.rb, line 278
def _reconn_prep()
  close_socket()
  if @parameters
    change_host()
  end
  @st.kill if @st # Kill ticker thread if any
  @rt.kill if @rt # Kill ticker thread if any
  @socket = nil
end
_reconn_prep_hb() click to toggle source

#_reconn_prep_hb prepares for a reconnect retry

# File lib/connection/heartbeats.rb, line 252
def _reconn_prep_hb()
  if @parameters
    change_host()
  end
  @socket = nil
  used_socket = socket()
  return used_socket
end
_start_receive_ticker() click to toggle source

#_start_receive_ticker starts a thread that receives heartbeats when required.

# File lib/connection/heartbeats.rb, line 148
def _start_receive_ticker()
  sleeptime = @hbrecv_interval / 1000000.0 # Sleep time secs
  read_fail_count = 0
  lock_fail_count = 0
  fail_hard = false
  @rt = Thread.new {

    #
    while true do
      sleep sleeptime
      next unless @socket # nil under some circumstances ??
      rdrdy = _is_ready?(@socket)
      curt = Time.now.to_f
      slog(:on_hbfire, log_params, "receive_fire", :curt => curt)
      #
      begin
        delta = curt - @lr
        if delta > sleeptime
          slog(:on_hbfire, log_params, "receive_heartbeat", {:delta => delta})
          # Client code could be off doing something else (that is, no reading of
          # the socket has been requested by the caller).  Try to  handle that case.
          lock = @read_semaphore.try_lock
          if lock
            lock_fail_count = 0 # clear
            rdrdy = _is_ready?(@socket)
            if rdrdy
              read_fail_count = 0 # clear
              last_char = @socket.getc
              if last_char.nil? # EOF from broker?
                fail_hard = true
              else
                @lr = Time.now.to_f
                plc = parse_char(last_char)
                if plc == "\n" # Server Heartbeat
                  @hbrecv_count += 1
                  @hb_received = true # Reset if necessary
                else
                  @socket.ungetc(last_char)
                end
              end
              @read_semaphore.unlock # Release read lock
            else # Socket is not ready
              @read_semaphore.unlock # Release read lock
              @hb_received = false
              read_fail_count += 1
              slog(:on_hbread_fail, log_params, {"ticker_interval" => sleeptime,
                "read_fail_count" => read_fail_count,
                "lock_fail" => false,
                "lock_fail_count" => lock_fail_count,
                "fail_point" => "not_ready"})
            end
          else  # try_lock failed
            # Shrug.  Could not get lock.  Client must be actually be reading.
            @hb_received = false
            # But notify caller if possible
            lock_fail_count += 1
            slog(:on_hbread_fail, log_params, {"ticker_interval" => sleeptime,
              "read_fail_count" => read_fail_count,
              "lock_fail" => true,
              "lock_fail_count" => lock_fail_count,
              "fail_point" => "try_lock_fail"})
          end # of the try_lock

        else # delta <= sleeptime
          @hb_received = true # Reset if necessary
          read_fail_count = 0 # reset
          lock_fail_count = 0 # reset
        end # of the if delta > sleeptime
      rescue Exception => recvex
        slog(:on_hbread_fail, log_params, {"ticker_interval" => sleeptime,
          "exception" => recvex,
          "read_fail_count" => read_fail_count,
          "lock_fail_count" => lock_fail_count,
          "fail_point" => "receive_exception"})
        fail_hard = true
      end
      # Do we want to attempt a retry?
      if @reliable
        # Retry on hard fail or max read fails
        if fail_hard ||
          (@max_hbread_fails > 0 && read_fail_count >= @max_hbread_fails)
          # This is an attempt at a connection retry.
          @st.kill if @st   # Kill the sender thread if one exists
          _reconn_prep_hb() # Drive reconnection logic
          Thread.exit       # This receiver thread is done
        end
        # Retry on max lock fails.  Different logic in order to avoid a deadlock.
        if (@max_hbrlck_fails > 0 && lock_fail_count >= @max_hbrlck_fails)
          # This is an attempt at a connection retry.
          @gets_semaphore.synchronize do
            @getst.raise(Errno::EBADF.new) if @getst # kill the socket reading thread if exists
            @socket.close rescue nil # Attempt a forced close
          end
          @st.kill if @st   # Kill the sender thread if one exists
          Thread.exit       # This receiver thread is done
        end
      end
      Thread.pass         # Prior to next receive loop
    #
    end # of the "while true"
  } # end of the Thread.new
end
_start_send_ticker() click to toggle source

#_start_send_ticker starts a thread to send heartbeats when required.

# File lib/connection/heartbeats.rb, line 90
def _start_send_ticker()
  sleeptime = @hbsend_interval / 1000000.0 # Sleep time secs
  reconn = false
  adjust = 0.0
  @st = Thread.new {
    first_time = true
    while true do
      #
      slt = sleeptime - adjust - @fast_hbs_adjust
      sleep(slt)
      next unless @socket # nil under some circumstances ??
      curt = Time.now.to_f
      slog(:on_hbfire, log_params, "send_fire", :curt => curt, :last_sleep => slt)
      delta = curt - @ls
      # Be tolerant (minus), and always do this the first time through.
      # Reintroduce logic removed in d922fa.
      compval = (@hbsend_interval - (@hbsend_interval/5.0)) / 1000000.0
      if delta > compval || first_time
        first_time = false
        slog(:on_hbfire, log_params, "send_heartbeat", {:last_sleep => slt,
                :curt => curt, :last_send => @ls, :delta => delta,
                :compval => compval})
        # Send a heartbeat
        @transmit_semaphore.synchronize do
          begin
            @socket.puts
            @socket.flush       # Do not buffer heartbeats
            @ls = Time.now.to_f # Update last send
            @hb_sent = true     # Reset if necessary
            @hbsend_count += 1
          rescue Exception => sendex
            @hb_sent = false # Set the warning flag
            slog(:on_hbwrite_fail, log_params, {"ticker_interval" => sleeptime,
              "exception" => sendex})
            if @hbser
              raise # Re-raise if user requested this, otherwise ignore
            end
            if @reliable
              reconn = true
              break # exit the synchronize do
            end
          end
        end # of the synchronize
        if reconn
          # Attempt a fail over reconnect.  This is 'safe' here because
          # this thread no longer holds the @transmit_semaphore lock.
          @rt.kill if @rt   # Kill the receiver thread if one exists
          _reconn_prep_hb() # Drive reconnection logic
          Thread.exit       # This sender thread is done
        end
      end
      adjust = Time.now.to_f - curt
      Thread.pass
    end
  }
end
_transmit(used_socket, command, headers = {}, body = '') click to toggle source

_transmit is the real wire write logic.

# File lib/connection/netio.rb, line 226
def _transmit(used_socket, command, headers = {}, body = '')
  dtrdbg = ENV['DTRDBG'] ? true : false
  # p [ "wirewrite" ]
  # _dump_callstack()
  p [ "_transmit_headers_in1", headers ] if dtrdbg
  if @protocol >= Stomp::SPL_11 && command != Stomp::CMD_CONNECT
    headers = _encodeHeaders(headers)
    p [ "_transmit_headers_in2", headers ] if dtrdbg
  end
  @transmit_semaphore.synchronize do
    p [ "_transmit_lock", Thread::current() ] if dtrdbg
    # Handle nil body
    body = '' if body.nil?
    # The content-length should be expressed in bytes.
    # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters
    # With Unicode strings, # of bytes != # of characters.  So, use String#bytesize when available.
    body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length

    # ActiveMQ interprets every message as a BinaryMessage
    # if content_length header is included.
    # Using :suppress_content_length => true will suppress this behaviour
    # and ActiveMQ will interpret the message as a TextMessage.
    # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/
    # Lets send this header in the message, so it can maintain state when using unreceive
    headers[:'content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length]
    headers[:'content-type'] = "text/plain; charset=UTF-8" unless headers[:'content-type'] || headers[:suppress_content_type]
    p [ "_transmit_command", command ] if dtrdbg
    _wire_write(used_socket,command)
    p [ "_transmit_headers", headers ] if dtrdbg
    headers.each do |k,v|
      if v.is_a?(Array)
        v.each do |e|
          _wire_write(used_socket,"#{k}:#{e}")
        end
      else
        _wire_write(used_socket,"#{k}:#{v}")
      end
    end
    p [ "_transmit_headers done" ] if dtrdbg
    _wire_write(used_socket,"")
    if body != ''
      p [ "_transmit_body", body ] if dtrdbg
      if headers[:suppress_content_length]
        if tz = body.index("\00")
          used_socket.write body[0..tz-1]
        else
          used_socket.write body
        end
      else
        used_socket.write body
      end
    end
    used_socket.write "\0"
    # used_socket.flush if autoflush
    used_socket.flush

    if @protocol >= Stomp::SPL_11
      @ls = Time.now.to_f if @hbs
    end
    p [ "_transmit_UNlock", Thread::current() ] if dtrdbg
  end
end
_valid_utf8?(string) click to toggle source

Ref: unicode.org/mail-arch/unicode-ml/y2003-m02/att-0467/01-The_Algorithm_to_Valide_an_UTF-8_String

CONSIDER replacing this with a dependency on the utf8_validator gem. This code has been copied from there.

# File lib/connection/utf8.rb, line 19
def _valid_utf8?(string)
  case RUBY_VERSION
  when /1\.8\.[56]/
    bytes = []
    0.upto(string.length-1) {|i|
      bytes << string[i]
    }
  else
    bytes = string.bytes
  end

  #
  valid = true
  index = -1
  state = "start"
  #
  bytes.each do |next_byte|
    index += 1
    case state

      # State: 'start'
      # The 'start' state:
      # * handles all occurrences of valid single byte characters i.e., the ASCII character set
      # * provides state transition logic for start bytes of valid characters with 2-4 bytes
      # * signals a validation failure for all other single bytes
      #
    when "start"
      # puts "state: start" if DEBUG
      case next_byte

        # ASCII
        # * Input = 0x00-0x7F : change state to START
      when (0x00..0x7f)
        # puts "state: start 1" if DEBUG
        state = "start"

        # Start byte of two byte characters
        # * Input = 0xC2-0xDF: change state to A
      when (0xc2..0xdf)
        # puts "state: start 2" if DEBUG
        state = "a"

        # Start byte of some three byte characters
        # * Input = 0xE1-0xEC, 0xEE-0xEF: change state to B
      when (0xe1..0xec)
        # puts "state: start 3" if DEBUG
        state = "b"
      when (0xee..0xef)
        # puts "state: start 4" if DEBUG
        state = "b"

        # Start byte of special three byte characters
        # * Input = 0xE0: change state to C
      when 0xe0
        # puts "state: start 5" if DEBUG
        state = "c"

        # Start byte of the remaining three byte characters
        # * Input = 0xED: change state to D
      when 0xed
        # puts "state: start 6" if DEBUG
        state = "d"

        # Start byte of some four byte characters
        # * Input = 0xF1-0xF3:change state to E
      when (0xf1..0xf3)
        # puts "state: start 7" if DEBUG
        state = "e"

        # Start byte of special four byte characters
        # * Input = 0xF0: change state to F
      when 0xf0
        # puts "state: start 8" if DEBUG
        state = "f"

        # Start byte of very special four byte characters
        # * Input = 0xF4: change state to G
      when 0xf4
        # puts "state: start 9" if DEBUG
        state = "g"

        # All other single characters are invalid
        # * Input = Others (0x80-0xBF,0xC0-0xC1, 0xF5-0xFF): ERROR
      else
        valid = false
        break
      end # of the inner case, the 'start' state

      # The last continuation byte of a 2, 3, or 4 byte character
      # State: 'a'
      #  o Input = 0x80-0xBF: change state to START
      #  o Others: ERROR
    when "a"
      # puts "state: a" if DEBUG
      if (0x80..0xbf) === next_byte
        state = "start"
      else
        valid = false
        break
      end

      # The first continuation byte for most 3 byte characters
      # (those with start bytes in: 0xe1-0xec or 0xee-0xef)
      # State: 'b'
      # o Input = 0x80-0xBF: change state to A
      # o Others: ERROR
    when "b"
      # puts "state: b" if DEBUG
      if (0x80..0xbf) === next_byte
        state = "a"
      else
        valid = false
        break
      end

      # The first continuation byte for some special 3 byte characters
      # (those with start byte 0xe0)
      # State: 'c'
      # o Input = 0xA0-0xBF: change state to A
      # o Others: ERROR
    when "c"
      # puts "state: c" if DEBUG
      if (0xa0..0xbf) === next_byte
        state = "a"
      else
        valid = false
        break
      end

      # The first continuation byte for the remaining 3 byte characters
      # (those with start byte 0xed)
      # State: 'd'
      # o Input = 0x80-0x9F: change state to A
      # o Others: ERROR
    when "d"
      # puts "state: d" if DEBUG
      if (0x80..0x9f) === next_byte
        state = "a"
      else
        valid = false
        break
      end

      # The first continuation byte for some 4 byte characters
      # (those with start bytes in: 0xf1-0xf3)
      # State: 'e'
      # o Input = 0x80-0xBF: change state to B
      # o Others: ERROR
    when "e"
      # puts "state: e" if DEBUG
      if (0x80..0xbf) === next_byte
        state = "b"
      else
        valid = false
        break
      end

      # The first continuation byte for some special 4 byte characters
      # (those with start byte 0xf0)
      # State: 'f'
      # o Input = 0x90-0xBF: change state to B
      # o Others: ERROR
    when "f"
      # puts "state: f" if DEBUG
      if (0x90..0xbf) === next_byte
        state = "b"
      else
        valid = false
        break
      end

      # The first continuation byte for the remaining 4 byte characters
      # (those with start byte 0xf4)
      # State: 'g'
      # o Input = 0x80-0x8F: change state to B
      # o Others: ERROR
    when "g"
      # puts "state: g" if DEBUG
      if (0x80..0x8f) === next_byte
        state = "b"
      else
        valid = false
        break
      end

      #
    else
      raise RuntimeError, "state: default"
    end
  end
  #
  # puts "State at end: #{state}" if DEBUG
  # Catch truncation at end of string
  if valid and state != 'start'
    # puts "Resetting valid value" if DEBUG
    valid = false
  end
  #
  valid
end
_validate_hbheader() click to toggle source
# File lib/connection/heartbeats.rb, line 13
def _validate_hbheader()
  return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats.  OK.
  parts = @connect_headers[:"heart-beat"].split(",")
  if (parts.size != 2) || (parts[0] != parts[0].to_i.to_s) || (parts[1] != parts[1].to_i.to_s)
    raise Stomp::Error::InvalidHeartBeatHeaderError
  end
end
_wire_write(sock, data) click to toggle source

Use CRLF if protocol is >= 1.2, and the client requested CRLF

# File lib/connection/netio.rb, line 290
def _wire_write(sock, data)
  # p [ "debug_01", @protocol, @usecrlf ]
  dwrdbg = ENV['DWRDBG'] ? true : false
  if @protocol >= Stomp::SPL_12 && @usecrlf
    wiredata = "#{data}#{Stomp::CR}#{Stomp::LF}"
    # p [ "wiredataout_01:", wiredata ]
    sock.write(wiredata)
  else
    p [ "_wire_write_begin:", "#{data}" ] if dwrdbg
    if @jruby && @ssl
      p [ "_wire_write_jrbeg:" ] if dwrdbg
      # Same results for all of these write methods.
      # sock.puts data
      # sock.print "#{data}\n"
      # sock.syswrite "#{data}\n"
      sock.write "#{data}\n"
      p [ "_wire_write_jrend:" ] if dwrdbg
    else
      sock.puts data
    end
    p [ "_wire_write_end:" ] if dwrdbg
  end
end
change_host() click to toggle source

#change_host selects the next host for retries.

# File lib/connection/utils.rb, line 201
def change_host
  @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]

  # Set first as master and send it to the end of array
  current_host = @parameters[:hosts].shift
  @parameters[:hosts] << current_host

  @ssl = current_host[:ssl]
  @host = current_host[:host]
  @port = current_host[:port] || Connection::default_port(@ssl)
  @login = current_host[:login] || ""
  @passcode = current_host[:passcode] || ""
end
close_socket() click to toggle source

#close_socket closes the current open socket, and hence the connection.

# File lib/connection/netio.rb, line 503
def close_socket()
  begin
    # Need to set @closed = true before closing the socket
    # within the @read_semaphore thread
    @closed = true
    @read_semaphore.synchronize do
      @socket.close
    end
  rescue
    #Ignoring if already closed
  end
  @closed
end
connect(used_socket) click to toggle source

connect performs a basic STOMP CONNECT operation.

# File lib/connection/netio.rb, line 541
def connect(used_socket)
  @connect_headers = {} unless @connect_headers # Caller said nil/false
  headers = @connect_headers.clone
  headers[:login] = @login unless @login.to_s.empty?
  headers[:passcode] = @passcode unless @login.to_s.empty?
  _pre_connect
  if !@hhas10 && @stompconn
    _transmit(used_socket, Stomp::CMD_STOMP, headers)
  else
    _transmit(used_socket, Stomp::CMD_CONNECT, headers)
  end
  connread = true
  noiosel = false
  @connection_frame = _receive(used_socket, connread, noiosel)
  _post_connect
  @disconnect_receipt = nil
  @session = @connection_frame.headers["session"] if @connection_frame
  # replay any subscriptions.
  @subscriptions.each {|k,v|
    _transmit(used_socket, Stomp::CMD_SUBSCRIBE, v)
  }
end
increase_reconnect_delay() click to toggle source

#increase_reconnect_delay increases the reconnect delay for the next connection attempt.

# File lib/connection/utils.rb, line 235
def increase_reconnect_delay

  @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off]
  @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]

  @reconnect_delay
end
log_params() click to toggle source

Create parameters for any callback logger.

# File lib/connection/utils.rb, line 44
def log_params()
  lparms = @parameters.clone if @parameters
  lparms = {} unless lparms
  lparms[:cur_host] = @host
  lparms[:cur_port] = @port
  lparms[:cur_login] = @login
  lparms[:cur_passcode] = @passcode
  lparms[:cur_ssl] = @ssl
  lparms[:cur_recondelay] = @reconnect_delay
  lparms[:cur_parseto] = @parse_timeout
  lparms[:cur_conattempts] = @connection_attempts
  lparms[:cur_failure] = @failure # To assist in debugging
  lparms[:openstat] = open?
  #
  lparms
end
max_reconnect_attempts?() click to toggle source

max_reconnect_attempts? returns nil or the number of maximum reconnect attempts.

# File lib/connection/utils.rb, line 229
def max_reconnect_attempts?
  !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
end
open_socket() click to toggle source

#open_socket opens a TCP or SSL soclet as required.

# File lib/connection/netio.rb, line 518
def open_socket()
  used_socket = @ssl ? open_ssl_socket : open_tcp_socket
  # try to close the old connection if any
  close_socket

  @closed = false
  if @parameters # nil in some rspec tests
    unless @reconnect_delay
      @reconnect_delay = @parameters[:initial_reconnect_delay] || iosto1
    end
  end
  # Use keepalive
  used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)

  # TCP_NODELAY option (disables Nagle's algorithm)
  used_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, !!(@parameters && @parameters[:tcp_nodelay]))

  @iosto = @parse_timeout ? @parse_timeout.to_f : 0.0

  used_socket
end
open_ssl_socket() click to toggle source

#open_ssl_socket opens an SSL socket.

# File lib/connection/netio.rb, line 328
def open_ssl_socket()
  require 'openssl' unless defined?(OpenSSL)
  ossdbg = ENV['OSSDBG'] ? true : false
  begin # Any raised SSL exceptions
    ctx = @sslctx_newparm ? OpenSSL::SSL::SSLContext.new(@sslctx_newparm) : OpenSSL::SSL::SSLContext.new
    ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE # Assume for now
    #
    # Note: if a client uses :ssl => true this would result in the gem using
    # the _default_ Ruby ciphers list.  This is _known_ to fail in later
    # Ruby releases.  The gem now detects :ssl => true, and replaces that
    # with:
    # * :ssl => Stomp::SSLParams.new
    #
    # The above results in the use of Stomp default parameters.
    #
    # To specifically request Stomp default parameters, use:
    # * :ssl => Stomp::SSLParams.new(..., :ciphers => Stomp::DEFAULT_CIPHERS)
    #
    # If connecting with an SSLParams instance, and the _default_ Ruby
    # ciphers list is actually required, use:
    # * :ssl => Stomp::SSLParams.new(..., :use_ruby_ciphers => true)
    #
    # If a custom ciphers list is required, connect with:
    # * :ssl => Stomp::SSLParams.new(..., :ciphers => custom_ciphers_list)
    #
    if @ssl != true
      #
      # Here @ssl is:
      # * an instance of Stomp::SSLParams
      # Control would not be here if @ssl == false or @ssl.nil?.
      #

      # Back reference the SSLContext
      @ssl.ctx = ctx

      # Server authentication parameters if required
      if @ssl.ts_files
        ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
        truststores = OpenSSL::X509::Store.new
        fl = @ssl.ts_files.split(",")
        fl.each do |fn|
          # Add next cert file listed
          raise Stomp::Error::SSLNoTruststoreFileError if !File::exists?(fn)
          raise Stomp::Error::SSLUnreadableTruststoreFileError if !File::readable?(fn)
          truststores.add_file(fn)
        end
        ctx.cert_store = truststores
      end
      #
      p [ "OSSL50", "old code starts" ] if ossdbg
      usecert = nil
      usekey = nil
      # Client authentication
      # If cert exists as a file, then it should not be input as text
      raise Stomp::Error::SSLClientParamsError if !@ssl.cert_file.nil? &&
        !@ssl.cert_text.nil?
      # If cert exists as file, then key must exist, either as text or file
      raise Stomp::Error::SSLClientParamsError if !@ssl.cert_file.nil? &&
        @ssl.key_file.nil? && @ssl.key_text.nil?
      if @ssl.cert_file
        raise Stomp::Error::SSLNoCertFileError if !File::exists?(@ssl.cert_file)
        raise Stomp::Error::SSLUnreadableCertFileError if !File::readable?(@ssl.cert_file)
        p [ "OSSL51", "old code cert file read" ] if ossdbg
        usecert = OpenSSL::X509::Certificate.new(File.read(@ssl.cert_file))
      end
      # If cert exists as file, then key must exist, either as text or file
      raise Stomp::Error::SSLClientParamsError if !@ssl.cert_text.nil? &&
        @ssl.key_file.nil? && @ssl.key_text.nil?
      if @ssl.cert_text
        p [ "OSSL52", "old code cert text get" ] if ossdbg
        usecert = OpenSSL::X509::Certificate.new(@ssl.cert_text)
      end

      # If key exists as a text, then it should not be input as file
      raise Stomp::Error::SSLClientParamsError if !@ssl.key_text.nil? &&
        !@ssl.key_file.nil?
      if @ssl.key_file
        raise Stomp::Error::SSLNoKeyFileError if !File::exists?(@ssl.key_file)
        raise Stomp::Error::SSLUnreadableKeyFileError if !File::readable?(@ssl.key_file)
        p [ "OSSL53", "old code key file read" ] if ossdbg
        usekey  = OpenSSL::PKey::RSA.new(File.read(@ssl.key_file), @ssl.key_password)
      end

      if @ssl.key_text
        nt = @ssl.key_text.gsub(/\t/, "")
        p [ "OSSL54", "old code key text get" ] if ossdbg
        usekey  = OpenSSL::PKey::RSA.new(nt, @ssl.key_password)
      end
      #
      # This style of code because:  in newer Ruby versions the 'cert'
      # and 'key' attributes are deprecated.  It is suggested that the
      # 'add_certificate' method be used instead.
      #
      if ctx.respond_to?(:add_certificate)  # Newer Ruby version ??
        p [ "OSSL55", "new code option", usecert, usekey ] if ossdbg
        if !usecert.nil? && !usekey.nil?
          p [ "OSSL55", "new code add_certificate" ] if ossdbg
          ctx.add_certificate(usecert, usekey)
        else
          p [ "OSSL56", "new code SKIP add_certificate" ] if ossdbg
        end
      else
        # Older Ruby versions
        p [ "OSSL56", "old code option", usecert, usekey ] if ossdbg
        ctx.cert = usecert
        ctx.key = usekey
      end
      p [ "OSSL99", "old code ends" ] if ossdbg
      # Cipher list
      # As of this writing, there are numerous problems with supplying
      # cipher lists to jruby.  So we do not attempt to do that here.
      if !@ssl.use_ruby_ciphers # No Ruby ciphers (the default)
        if @ssl.ciphers # User ciphers list?
          ctx.ciphers = @ssl.ciphers # Accept user supplied ciphers
        else
          ctx.ciphers = Stomp::DEFAULT_CIPHERS # Just use Stomp defaults
        end
      end unless @jruby

      # Set SSLContext Options if user asks for it in Stomp::SSLParams
      # and SSL supports it.
      if @ssl.ssl_ctxopts && ctx.respond_to?(:options=)
        ctx.options = @ssl.ssl_ctxopts
      end

    end

    #
    ssl = nil
    slog(:on_ssl_connecting, log_params)
    # _dump_ctx(ctx)
    Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do
      tcp_socket = TCPSocket.open(@host, @port)
      ssl = OpenSSL::SSL::SSLSocket.new(tcp_socket, ctx)
      ssl.hostname = @host if ssl.respond_to? :hostname=
      ssl.sync_close = true # Sync ssl close with underlying TCP socket
      ssl.connect
      if (ssl.context.verify_mode != OpenSSL::SSL::VERIFY_NONE) && @ssl_post_conn_check
        ssl.post_connection_check(@host)
      end
    end
    def ssl.ready?
      @ssl_ready_lock ||= Mutex.new
      @ssl_ready_lock.synchronize do
        ! @rbuffer.empty? || @io.ready?
      end
    end

    if @ssl != true
      # Pass back results if possible
      if RUBY_VERSION =~ /1\.8\.[56]/
        @ssl.verify_result = "N/A for Ruby #{RUBY_VERSION}"
      else
        @ssl.verify_result = ssl.verify_result
      end
      @ssl.peer_cert = ssl.peer_cert
    end
    slog(:on_ssl_connected, log_params)
    ssl
  rescue Exception => ex
    lp = log_params.clone
    lp[:ssl_exception] = ex
    slog(:on_ssl_connectfail, lp)
    if ssl
      # shut down the TCP socket - we just failed to do the SSL handshake in time
      ssl.close
    end
    #
    puts ex.backtrace if ossdbg
    $stdout.flush if ossdbg
    raise # Reraise
  end
end
open_tcp_socket() click to toggle source

#open_tcp_socket opens a TCP socket.

# File lib/connection/netio.rb, line 315
def open_tcp_socket()

  ## $stderr.print("h: #{@host}, p: #{@port}\n")

  tcp_socket = nil
  slog(:on_connecting, log_params)
  Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do
    tcp_socket = TCPSocket.open(@host, @port)
  end
  tcp_socket
end
parse_char(char) click to toggle source

Handle 1.9+ character representation.

# File lib/connection/utils.rb, line 39
def parse_char(char)
  RUBY_VERSION > '1.9' ? char : char.chr
end
refine_params(params) click to toggle source

#refine_params sets up defaults for a Hash initialize.

# File lib/connection/utils.rb, line 163
def refine_params(params)
  params = params.uncamelize_and_symbolize_keys
  default_params = {
    :connect_headers => {},
    :reliable => true,
    # Failover parameters
    :initial_reconnect_delay => 0.01,
    :max_reconnect_delay => 30.0,
    :use_exponential_back_off => true,
    :back_off_multiplier => 2,
    :max_reconnect_attempts => 0,
    :randomize => false,
    :connect_timeout => 0,
    # Parse Timeout
    :parse_timeout => 5,
    :dmh => false,
    # Closed check logic
    :closed_check => true,
    :hbser => false,
    :stompconn => false,
    :max_hbread_fails => 0,
    :max_hbrlck_fails => 0,
    :fast_hbs_adjust => 0.0,
    :connread_timeout => 0,
    :tcp_nodelay => true,
    :start_timeout => 0,
    :sslctx_newparm => nil,
    :ssl_post_conn_check => true,
  }

  res_params = default_params.merge(params)
  if res_params[:dmh]
    res_params = _expand_hosts(res_params)
  end
  return res_params
end
socket() click to toggle source

socket creates and returns a new socket for use by the connection.

# File lib/connection/utils.rb, line 107
def socket()
  @socket_semaphore.synchronize do
    used_socket = @socket
    used_socket = nil if closed?

    while used_socket.nil? || !@failure.nil?
      @failure = nil
      begin
        used_socket = open_socket() # sets @closed = false if OK
        # Open is complete
        connect(used_socket)
        slog(:on_connected, log_params)
        @connection_attempts = 0
      rescue
        @failure = $!
        used_socket = nil
        @closed = true

        raise unless @reliable
        raise if @failure.is_a?(Stomp::Error::LoggerConnectionError)
        # Catch errors which are:
        # a) emitted from corrupted 1.1+ 'connect' (caller programming error)
        # b) should never be retried
        raise if @failure.is_a?(Stomp::Error::ProtocolError11p)

        begin
          unless slog(:on_connectfail,log_params)
            $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"
          end
        rescue Exception => aex
          raise if aex.is_a?(Stomp::Error::LoggerConnectionError)
        end
        if max_reconnect_attempts?
          $stderr.print "In socket() Reached MaxReconnectAttempts"
          ### _dump_threads()
          mt = @parameters[:client_main]
          if !mt.nil?
            mt.raise Stomp::Error::MaxReconnectAttempts
            Thread::exit
          end
          raise Stomp::Error::MaxReconnectAttempts
        end
        sleep(@reconnect_delay)
        @connection_attempts += 1

        if @parameters
          change_host()
          increase_reconnect_delay()
        end
      end
    end
    @socket = used_socket
  end
end
transmit(command, headers = {}, body = '') click to toggle source

transmit logically puts a Message on the wire.

# File lib/connection/netio.rb, line 201
def transmit(command, headers = {}, body = '')
  # p [ "XMIT01", command, headers ]
  # The transmit may fail so we may need to retry.
  while true
    begin
      used_socket = socket()
      _transmit(used_socket, command, headers, body)
      return
    rescue Stomp::Error::MaxReconnectAttempts => e
      _ = e
      raise
    rescue
      @failure = $!
      raise unless @reliable
      errstr = "transmit to #{@host} failed: #{$!}\n"
      unless slog(:on_miscerr, log_params, "es_trans: " + errstr)
        $stderr.print errstr
      end
      # !!! This loop initiates a re-connect !!!
      _reconn_prep()
    end
  end
end