class Prometheus::Client::DataStores::DirectFileStore::MetricStore

Attributes

metric_name[R]
store_settings[R]

Public Class Methods

new(metric_name:, store_settings:, metric_settings:) click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 73
def initialize(metric_name:, store_settings:, metric_settings:)
  @metric_name = metric_name
  @store_settings = store_settings
  @values_aggregation_mode = metric_settings[:aggregation]

  @lock = Monitor.new
end

Public Instance Methods

all_values() click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 116
def all_values
  stores_data = Hash.new{ |hash, key| hash[key] = [] }

  # There's no need to call `synchronize` here. We're opening a second handle to
  # the file, and `flock`ing it, which prevents inconsistent reads
  stores_for_metric.each do |file_path|
    begin
      store = FileMappedDict.new(file_path, true)
      store.all_values.each do |(labelset_qs, v)|
        # Labels come as a query string, and CGI::parse returns arrays for each key
        # "foo=bar&x=y" => { "foo" => ["bar"], "x" => ["y"] }
        # Turn the keys back into symbols, and remove the arrays
        label_set = CGI::parse(labelset_qs).map do |k, vs|
          [k.to_sym, vs.first]
        end.to_h

        stores_data[label_set] << v
      end
    ensure
      store.close if store
    end
  end

  # Aggregate all the different values for each label_set
  aggregate_hash = Hash.new { |hash, key| hash[key] = 0.0 }
  stores_data.each_with_object(aggregate_hash) do |(label_set, values), acc|
    acc[label_set] = aggregate_values(values)
  end
end
get(labels:) click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 110
def get(labels:)
  in_process_sync do
    internal_store.read_value(store_key(labels))
  end
end
increment(labels:, by: 1) click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 102
def increment(labels:, by: 1)
  key = store_key(labels)
  in_process_sync do
    value = internal_store.read_value(key)
    internal_store.write_value(key, value + by.to_f)
  end
end
set(labels:, val:) click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 96
def set(labels:, val:)
  in_process_sync do
    internal_store.write_value(store_key(labels), val.to_f)
  end
end
synchronize() { || ... } click to toggle source

Synchronize is used to do a multi-process Mutex, when incrementing multiple values at once, so that the other process, reading the file for export, doesn't get incomplete increments.

`in_process_sync`, instead, is just used so that two threads don't increment the same value and get a context switch between read and write leading to an inconsistency

# File lib/prometheus/client/data_stores/direct_file_store.rb, line 88
def synchronize
  in_process_sync do
    internal_store.with_file_lock do
      yield
    end
  end
end

Private Instance Methods

aggregate_values(values) click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 183
def aggregate_values(values)
  if @values_aggregation_mode == SUM
    values.inject { |sum, element| sum + element }
  elsif @values_aggregation_mode == MAX
    values.max
  elsif @values_aggregation_mode == MIN
    values.min
  elsif @values_aggregation_mode == ALL
    values.first
  else
    raise InvalidStoreSettingsError,
          "Invalid Aggregation Mode: #{ @values_aggregation_mode }"
  end
end
filemap_filename() click to toggle source

Filename for this metric's PStore (one per process)

# File lib/prometheus/client/data_stores/direct_file_store.rb, line 170
def filemap_filename
  filename = "metric_#{ metric_name }___#{ process_id }.bin"
  File.join(@store_settings[:dir], filename)
end
in_process_sync() { || ... } click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 148
def in_process_sync
  @lock.synchronize { yield }
end
internal_store() click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 160
def internal_store
  if @store_opened_by_pid != process_id
    @store_opened_by_pid = process_id
    @internal_store = FileMappedDict.new(filemap_filename)
  else
    @internal_store
  end
end
process_id() click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 179
def process_id
  Process.pid
end
store_key(labels) click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 152
def store_key(labels)
  if @values_aggregation_mode == ALL
    labels[:pid] = process_id
  end

  labels.to_a.sort.map{|k,v| "#{CGI::escape(k.to_s)}=#{CGI::escape(v.to_s)}"}.join('&')
end
stores_for_metric() click to toggle source
# File lib/prometheus/client/data_stores/direct_file_store.rb, line 175
def stores_for_metric
  Dir.glob(File.join(@store_settings[:dir], "metric_#{ metric_name }___*"))
end