class ElasticAPM::Transport::Connection::Http

@api private

Public Class Methods

new(config) click to toggle source
# File lib/elastic_apm/transport/connection/http.rb, line 16
def initialize(config)
  @config = config
  @closed = Concurrent::AtomicBoolean.new

  @rd, @wr = ProxyPipe.pipe(compress: @config.http_compression?)
end
open(config, url, headers: {}, ssl_context: nil) click to toggle source
# File lib/elastic_apm/transport/connection/http.rb, line 27
def self.open(config, url, headers: {}, ssl_context: nil)
  new(config).tap do |http|
    http.open(url, headers: headers, ssl_context: ssl_context)
  end
end

Public Instance Methods

close(reason) click to toggle source
# File lib/elastic_apm/transport/connection/http.rb, line 38
def close(reason)
  return if closed?

  debug '%s: Closing request with reason %s', thread_str, reason
  @closed.make_true

  @wr&.close(reason)
  return if @request.nil? || @request&.join(5)

  error(
    '%s: APM Server not responding in time, terminating request',
    thread_str
  )
  @request.kill
end
closed?() click to toggle source
# File lib/elastic_apm/transport/connection/http.rb, line 54
def closed?
  @closed.true?
end
inspect() click to toggle source
Calls superclass method
# File lib/elastic_apm/transport/connection/http.rb, line 58
def inspect
  format(
    '%s closed: %s>',
    super.split.first,
    closed?
  )
end
open(url, headers: {}, ssl_context: nil) click to toggle source
# File lib/elastic_apm/transport/connection/http.rb, line 23
def open(url, headers: {}, ssl_context: nil)
  @request = open_request_in_thread(url, headers, ssl_context)
end
write(str) click to toggle source
# File lib/elastic_apm/transport/connection/http.rb, line 33
def write(str)
  @wr.write(str)
  @wr.bytes_sent
end

Private Instance Methods

build_client(headers) click to toggle source

rubocop:enable Metrics/LineLength

# File lib/elastic_apm/transport/connection/http.rb, line 87
def build_client(headers)
  client = HTTP.headers(headers)
  return client unless @config.proxy_address && @config.proxy_port

  client.via(
    @config.proxy_address,
    @config.proxy_port,
    @config.proxy_username,
    @config.proxy_password,
    @config.proxy_headers
  )
end
open_request_in_thread(url, headers, ssl_context) click to toggle source

rubocop:disable Metrics/LineLength

# File lib/elastic_apm/transport/connection/http.rb, line 73
def open_request_in_thread(url, headers, ssl_context)
  client = build_client(headers)

  debug '%s: Opening new request', thread_str
  Thread.new do
    begin
      post(client, url, ssl_context)
    rescue Exception => e
      error "Couldn't establish connection to APM Server:\n%p", e.inspect
    end
  end
end
post(client, url, ssl_context) click to toggle source
# File lib/elastic_apm/transport/connection/http.rb, line 100
def post(client, url, ssl_context)
  resp = client.post(
    url,
    body: @rd,
    ssl_context: ssl_context
  ).flush

  if resp&.status == 202
    debug 'APM Server responded with status 202'
  elsif resp
    error "APM Server responded with an error:\n%p", resp.body.to_s
  end
end
thread_str() click to toggle source
# File lib/elastic_apm/transport/connection/http.rb, line 68
def thread_str
  format('[THREAD:%s]', Thread.current.object_id)
end