class ElasticAPM::Transport::Worker

@api private

Attributes

connection[R]
filters[R]
name[R]
queue[R]
serializers[R]

Public Class Methods

new( config, queue, serializers:, filters:, conn_adapter: Connection ) click to toggle source
# File lib/elastic_apm/transport/worker.rb, line 15
def initialize(
  config,
  queue,
  serializers:,
  filters:,
  conn_adapter: Connection
)
  @config = config
  @queue = queue

  @serializers = serializers
  @filters = filters

  metadata = serializers.serialize(Metadata.new(config))
  @connection = conn_adapter.new(config, metadata)
end

Public Instance Methods

process(resource) click to toggle source

rubocop:enable Metrics/MethodLength

# File lib/elastic_apm/transport/worker.rb, line 52
def process(resource)
  return unless (json = serialize_and_filter(resource))
  connection.write(json)
end
work_forever() click to toggle source

rubocop:disable Metrics/MethodLength

# File lib/elastic_apm/transport/worker.rb, line 35
def work_forever
  while (msg = queue.pop)
    case msg
    when StopMessage
      debug 'Stopping worker -- %s', self
      connection.flush(:halt)
      break
    else
      process msg
    end
  end
rescue Exception => e
  warn 'Worker died with exception: %s', e.inspect
  debug e.backtrace.join("\n")
end

Private Instance Methods

serialize_and_filter(resource) click to toggle source
# File lib/elastic_apm/transport/worker.rb, line 59
def serialize_and_filter(resource)
  serialized = serializers.serialize(resource)

  # if a filter returns nil, it means skip the event
  return nil if @filters.apply!(serialized) == Filters::SKIP

  JSON.fast_generate(serialized)
rescue Exception
  error format('Failed converting event to JSON: %s', resource.inspect)
  error serialized.inspect
  nil
end