class ActiveSupport::Notifications::Fanout

This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 17
def initialize
  @string_subscribers = Hash.new { |h, k| h[k] = [] }
  @other_subscribers = []
  @listeners_for = Concurrent::Map.new
  super
end

Public Instance Methods

finish(name, id, payload, listeners = listeners_for(name)) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 62
def finish(name, id, payload, listeners = listeners_for(name))
  listeners.each { |s| s.finish(name, id, payload) }
end
listeners_for(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 70
def listeners_for(name)
  # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
  @listeners_for[name] || synchronize do
    # use synchronisation when accessing @subscribers
    @listeners_for[name] ||=
      @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
  end
end
listening?(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 79
def listening?(name)
  listeners_for(name).any?
end
publish(name, *args) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 66
def publish(name, *args)
  listeners_for(name).each { |s| s.publish(name, *args) }
end
start(name, id, payload) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 58
def start(name, id, payload)
  listeners_for(name).each { |s| s.start(name, id, payload) }
end
subscribe(pattern = nil, callable = nil, monotonic: false, &block) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 24
def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
  subscriber = Subscribers.new(pattern, callable || block, monotonic)
  synchronize do
    if String === pattern
      @string_subscribers[pattern] << subscriber
      @listeners_for.delete(pattern)
    else
      @other_subscribers << subscriber
      @listeners_for.clear
    end
  end
  subscriber
end
unsubscribe(subscriber_or_name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 38
def unsubscribe(subscriber_or_name)
  synchronize do
    case subscriber_or_name
    when String
      @string_subscribers[subscriber_or_name].clear
      @listeners_for.delete(subscriber_or_name)
      @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
    else
      pattern = subscriber_or_name.try(:pattern)
      if String === pattern
        @string_subscribers[pattern].delete(subscriber_or_name)
        @listeners_for.delete(pattern)
      else
        @other_subscribers.delete(subscriber_or_name)
        @listeners_for.clear
      end
    end
  end
end
wait() click to toggle source

This is a sync queue, so there is no waiting.

# File lib/active_support/notifications/fanout.rb, line 84
def wait
end