class Quantile::Estimator

Estimate quantile values efficiently where both the rank and the inaccuracy allowance are known a priori. This is accomplished via Graham Cormode and S. Muthukrishnan's Effective Computation of Biased Quantiles over Data Streams in ICDE’05.

@note {Estimator} is not concurrency safe.

@see www.cs.rutgers.edu/~muthu/bquant.pdf Effective Computation of

Biased Quantiles over Data Streams

Constants

BUFFER_SIZE

Attributes

invariants[RW]

Get the quantile targets.

Public Class Methods

new(*invariants) click to toggle source

Create a streaming quantile estimator.

@param invariants [Quantile] The quantile estimation targets that are provided a priori. @return [Estimator] An initialized {Estimator} for the given targets.

# File lib/quantile/estimator.rb, line 34
def initialize(*invariants)
  if invariants.empty?
    invariants = [Quantile.new(0.5, 0.05), Quantile.new(0.90, 0.01), Quantile.new(0.99, 0.001)]
  end

  @invariants = invariants
  @buffer = []
  @head = nil

  @observations, @sum = 0, 0
end

Public Instance Methods

observations() click to toggle source

Get the number of observed values.

# File lib/quantile/estimator.rb, line 54
def observations
  flush
  @observations
end
observe(value) click to toggle source

Observe a sample value with this {Estimator}.

@param value [Numeric] The value to catalog for later analysis.

# File lib/quantile/estimator.rb, line 64
def observe(value)
  @buffer << value
  if @buffer.size == BUFFER_SIZE
    flush
  end
  @observations += 1
  @sum += value
end
query(rank) click to toggle source

Get a quantile value for a given rank.

@param rank [Float] The target quantile to retrieve. It must be one of

the invariants provided in the constructor.

@return [Numeric, nil] The quantile value for the rank or nil if no

observations are present.
# File lib/quantile/estimator.rb, line 89
def query(rank)
  flush

  current = @head
  return unless current

  mid_rank = (rank * @observations).floor
  max_rank = mid_rank + (invariant(mid_rank, @observations) / 2).floor

  rank = 0.0
  while current.successor
    rank += current.rank
    if rank + current.successor.rank + current.successor.delta > max_rank
      return current.value
    end

    current = current.successor
  end

  return current.value
end
sum() click to toggle source

Returns the sum of all observed values.

# File lib/quantile/estimator.rb, line 76
def sum
  @sum
end

Private Instance Methods

compress() click to toggle source
# File lib/quantile/estimator.rb, line 166
def compress
  rank = 0.0
  current = @head

  while current && current.successor
    if current.rank + current.successor.rank + current.successor.delta <= invariant(rank, @observations)
      removed = current.successor

      current.value = removed.value
      current.rank += removed.rank
      current.delta = removed.delta
      current.successor = removed.successor
    end

    rank += current.rank
    current = current.successor
  end
end
flush() click to toggle source
# File lib/quantile/estimator.rb, line 117
def flush
  return if @buffer.empty?
  @buffer.sort!
  replace_batch
  @buffer.clear
  compress
end
invariant(rank, n) click to toggle source
# File lib/quantile/estimator.rb, line 153
def invariant(rank, n)
  min = n + 1

  @invariants.each do |i|
    delta = i.delta(rank, n)
    if delta < min
      min = delta
    end
  end

  return min.floor
end
record(value, rank, delta, successor) click to toggle source
# File lib/quantile/estimator.rb, line 149
def record(value, rank, delta, successor)
  return Sample.new(value, rank, delta, successor)
end
replace_batch() click to toggle source
# File lib/quantile/estimator.rb, line 125
def replace_batch
  @head ||= record(@buffer.shift, 1, 0, nil)

  rank = 0.0
  current = @head

  @buffer.each do |s|
    if s < @head.value
      @head = record(s, 1, 0, @head)
    end

    while current.successor && current.successor.value < s
      rank += current.rank
      current = current.successor
    end

    unless current.successor
      current.successor = record(s, 1, 0, nil)
    end

    current.successor = record(s, 1, invariant(rank, @observations)-1, current.successor)
  end
end