class ElasticAPM::Transport::Base

rubocop:disable Metrics/ClassLength @api private

Constants

WATCHER_EXECUTION_INTERVAL
WATCHER_TIMEOUT_INTERVAL
WORKER_JOIN_TIMEOUT

Attributes

config[R]
filters[R]
queue[R]
stopped[R]
watcher[R]
workers[R]

Public Class Methods

new(config) click to toggle source
# File lib/elastic_apm/transport/base.rb, line 20
def initialize(config)
  @config = config
  @queue = SizedQueue.new(config.api_buffer_size)

  @serializers = Serializers.new(config)
  @filters = Filters.new(config)

  @stopped = Concurrent::AtomicBoolean.new
  @workers = Array.new(config.pool_size)

  @watcher_mutex = Mutex.new
  @worker_mutex = Mutex.new
end

Public Instance Methods

add_filter(key, callback) click to toggle source

rubocop:enable Metrics/MethodLength

# File lib/elastic_apm/transport/base.rb, line 72
def add_filter(key, callback)
  @filters.add(key, callback)
end
start() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 36
def start
  debug '%s: Starting Transport', pid_str

  ensure_watcher_running
  ensure_worker_count
end
stop() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 43
def stop
  debug '%s: Stopping Transport', pid_str

  @stopped.make_true

  stop_watcher
  stop_workers
end
submit(resource) click to toggle source

rubocop:disable Metrics/MethodLength

# File lib/elastic_apm/transport/base.rb, line 53
def submit(resource)
  if @stopped.true?
    warn '%s: Transport stopping, no new events accepted', pid_str
    return false
  end

  ensure_watcher_running
  queue.push(resource, true)

  true
rescue ThreadError
  throttled_queue_full_warning
  nil
rescue Exception => e
  error '%s: Failed adding to the transport queue: %p', pid_str, e.inspect
  nil
end

Private Instance Methods

all_workers_alive?() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 110
def all_workers_alive?
  !!workers.all? { |t| t&.alive? }
end
boot_worker() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 114
def boot_worker
  debug '%s: Booting worker...', pid_str

  Thread.new do
    Worker.new(
      config, queue,
      serializers: @serializers,
      filters: @filters
    ).work_forever
  end
end
ensure_watcher_running() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 82
def ensure_watcher_running
  # pid has changed == we've forked
  return if @pid == Process.pid

  @watcher_mutex.synchronize do
    return if @pid == Process.pid
    @pid = Process.pid

    @watcher = Concurrent::TimerTask.execute(
      execution_interval: WATCHER_EXECUTION_INTERVAL,
      timeout_interval: WATCHER_TIMEOUT_INTERVAL
    ) { ensure_worker_count }
  end
end
ensure_worker_count() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 97
def ensure_worker_count
  @worker_mutex.synchronize do
    return if all_workers_alive?
    return if stopped.true?

    @workers.map! do |thread|
      next thread if thread&.alive?

      boot_worker
    end
  end
end
pid_str() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 78
def pid_str
  format('[PID:%s]', Process.pid)
end
send_stop_messages() click to toggle source

rubocop:enable Metrics/MethodLength

# File lib/elastic_apm/transport/base.rb, line 149
def send_stop_messages
  config.pool_size.times { queue.push(Worker::StopMessage.new, true) }
rescue ThreadError
  warn 'Cannot push stop messages to worker queue as it is full'
end
stop_watcher() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 155
def stop_watcher
  @watcher_mutex.synchronize do
    return if watcher.nil? || @pid != Process.pid
    watcher.shutdown
  end
end
stop_workers() click to toggle source

rubocop:disable Metrics/MethodLength

# File lib/elastic_apm/transport/base.rb, line 127
def stop_workers
  debug '%s: Stopping workers', pid_str

  send_stop_messages

  @worker_mutex.synchronize do
    workers.each do |thread|
      next if thread.nil?
      next if thread.join(WORKER_JOIN_TIMEOUT)

      debug(
        '%s: Worker did not stop in %ds, killing...',
        pid_str, WORKER_JOIN_TIMEOUT
      )
      thread.kill
    end

    @workers.clear
  end
end
throttled_queue_full_warning() click to toggle source
# File lib/elastic_apm/transport/base.rb, line 162
def throttled_queue_full_warning
  (@queue_full_log ||= Util::Throttle.new(5) do
    warn(
      '%s: Queue is full (%i items), skipping…',
      pid_str, config.api_buffer_size
    )
  end).call
end