class ElasticAPM::Transport::Connection
@api private
Constants
- GZIP_HEADERS
- HEADERS
A connection holds an instance `http` of an Http::Connection.
The HTTP::Connection itself is not thread safe.
The connection sends write requests and close requests to `http`, and has to ensure no write requests are sent after closing `http`.
The connection schedules a separate thread to close an `http` connection some time in the future. To avoid the thread interfering with ongoing write requests to `http`, write and close requests have to be synchronized.
Attributes
http[R]
Public Class Methods
new(config, metadata)
click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 33 def initialize(config, metadata) @config = config @headers = build_headers(metadata) @metadata = JSON.fast_generate(metadata) @url = config.server_url + '/intake/v2/events' @ssl_context = build_ssl_context @mutex = Mutex.new end
Public Instance Methods
flush(reason = :force)
click to toggle source
rubocop:enable Metrics/MethodLength, Metrics/AbcSize
# File lib/elastic_apm/transport/connection.rb, line 72 def flush(reason = :force) # Could happen from the timertask so we need to sync @mutex.synchronize do return if http.nil? http.close(reason) end end
inspect()
click to toggle source
Calls superclass method
# File lib/elastic_apm/transport/connection.rb, line 80 def inspect format( '@%s http connection closed? :%s>', super.split.first, http.closed? ) end
write(str)
click to toggle source
rubocop:disable Metrics/MethodLength, Metrics/AbcSize
# File lib/elastic_apm/transport/connection.rb, line 45 def write(str) return false if @config.disable_send begin bytes_written = 0 # The request might get closed from timertask so let's make sure we # hold it open until we've written. @mutex.synchronize do connect if http.nil? || http.closed? bytes_written = http.write(str) end flush(:api_request_size) if bytes_written >= @config.api_request_size rescue IOError => e error('Connection error: %s', e.inspect) flush(:ioerror) rescue Errno::EPIPE => e error('Connection error: %s', e.inspect) flush(:broken_pipe) rescue Exception => e error('Connection error: %s', e.inspect) flush(:connection_error) end end
Private Instance Methods
build_headers(metadata)
click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 110 def build_headers(metadata) ( @config.http_compression? ? GZIP_HEADERS : HEADERS ).dup.tap do |headers| headers['User-Agent'] = build_user_agent(metadata) if (token = @config.secret_token) headers['Authorization'] = "Bearer #{token}" end end end
build_ssl_context()
click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 132 def build_ssl_context # rubocop:disable Metrics/MethodLength return unless @config.use_ssl? OpenSSL::SSL::SSLContext.new.tap do |context| if @config.server_ca_cert context.ca_file = @config.server_ca_cert else context.cert_store = OpenSSL::X509::Store.new.tap(&:set_default_paths) end context.verify_mode = if @config.verify_server_cert OpenSSL::SSL::VERIFY_PEER else OpenSSL::SSL::VERIFY_NONE end end end
build_user_agent(metadata)
click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 122 def build_user_agent(metadata) runtime = metadata.dig(:metadata, :service, :runtime) [ "elastic-apm-ruby/#{VERSION}", HTTP::Request::USER_AGENT, [runtime[:name], runtime[:version]].join('/') ].join(' ') end
connect()
click to toggle source
# File lib/elastic_apm/transport/connection.rb, line 90 def connect schedule_closing if @config.api_request_time @http = Http.open( @config, @url, headers: @headers, ssl_context: @ssl_context ).tap { |http| http.write(@metadata) } end
schedule_closing()
click to toggle source
rubocop:enable
# File lib/elastic_apm/transport/connection.rb, line 102 def schedule_closing @close_task&.cancel @close_task = Concurrent::ScheduledTask.execute(@config.api_request_time) do flush(:timeout) end end