class ElasticAPM::Transport::Connection

@api private

Constants

GZIP_HEADERS
HEADERS

A connection holds an instance `http` of an Http::Connection.

The HTTP::Connection itself is not thread safe.

The connection sends write requests and close requests to `http`, and has to ensure no write requests are sent after closing `http`.

The connection schedules a separate thread to close an `http` connection some time in the future. To avoid the thread interfering with ongoing write requests to `http`, write and close requests have to be synchronized.

Attributes

http[R]

Public Class Methods

new(config, metadata) click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 33
def initialize(config, metadata)
  @config = config
  @headers = build_headers(metadata)
  @metadata = JSON.fast_generate(metadata)
  @url = config.server_url + '/intake/v2/events'
  @ssl_context = build_ssl_context
  @mutex = Mutex.new
end

Public Instance Methods

flush(reason = :force) click to toggle source

rubocop:enable Metrics/MethodLength, Metrics/AbcSize

# File lib/elastic_apm/transport/connection.rb, line 72
def flush(reason = :force)
  # Could happen from the timertask so we need to sync
  @mutex.synchronize do
    return if http.nil?
    http.close(reason)
  end
end
inspect() click to toggle source
Calls superclass method
# File lib/elastic_apm/transport/connection.rb, line 80
def inspect
  format(
    '@%s http connection closed? :%s>',
    super.split.first,
    http.closed?
  )
end
write(str) click to toggle source

rubocop:disable Metrics/MethodLength, Metrics/AbcSize

# File lib/elastic_apm/transport/connection.rb, line 45
def write(str)
  return false if @config.disable_send

  begin
    bytes_written = 0

    # The request might get closed from timertask so let's make sure we
    # hold it open until we've written.
    @mutex.synchronize do
      connect if http.nil? || http.closed?
      bytes_written = http.write(str)
    end

    flush(:api_request_size) if bytes_written >= @config.api_request_size
  rescue IOError => e
    error('Connection error: %s', e.inspect)
    flush(:ioerror)
  rescue Errno::EPIPE => e
    error('Connection error: %s', e.inspect)
    flush(:broken_pipe)
  rescue Exception => e
    error('Connection error: %s', e.inspect)
    flush(:connection_error)
  end
end

Private Instance Methods

build_headers(metadata) click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 110
def build_headers(metadata)
  (
    @config.http_compression? ? GZIP_HEADERS : HEADERS
  ).dup.tap do |headers|
    headers['User-Agent'] = build_user_agent(metadata)

    if (token = @config.secret_token)
      headers['Authorization'] = "Bearer #{token}"
    end
  end
end
build_ssl_context() click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 132
def build_ssl_context # rubocop:disable Metrics/MethodLength
  return unless @config.use_ssl?

  OpenSSL::SSL::SSLContext.new.tap do |context|
    if @config.server_ca_cert
      context.ca_file = @config.server_ca_cert
    else
      context.cert_store =
        OpenSSL::X509::Store.new.tap(&:set_default_paths)
    end

    context.verify_mode =
      if @config.verify_server_cert
        OpenSSL::SSL::VERIFY_PEER
      else
        OpenSSL::SSL::VERIFY_NONE
      end
  end
end
build_user_agent(metadata) click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 122
def build_user_agent(metadata)
  runtime = metadata.dig(:metadata, :service, :runtime)

  [
    "elastic-apm-ruby/#{VERSION}",
    HTTP::Request::USER_AGENT,
    [runtime[:name], runtime[:version]].join('/')
  ].join(' ')
end
connect() click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 90
def connect
  schedule_closing if @config.api_request_time

  @http =
    Http.open(
      @config, @url,
      headers: @headers,
      ssl_context: @ssl_context
    ).tap { |http| http.write(@metadata) }
end
schedule_closing() click to toggle source

rubocop:enable

# File lib/elastic_apm/transport/connection.rb, line 102
def schedule_closing
  @close_task&.cancel
  @close_task =
    Concurrent::ScheduledTask.execute(@config.api_request_time) do
      flush(:timeout)
    end
end