class ActiveSupport::Notifications::Fanout

This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 16
def initialize
  @string_subscribers = Hash.new { |h, k| h[k] = [] }
  @other_subscribers = []
  @listeners_for = Concurrent::Map.new
  super
end

Public Instance Methods

finish(name, id, payload, listeners = listeners_for(name)) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 61
def finish(name, id, payload, listeners = listeners_for(name))
  listeners.each { |s| s.finish(name, id, payload) }
end
listeners_for(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 69
def listeners_for(name)
  # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
  @listeners_for[name] || synchronize do
    # use synchronisation when accessing @subscribers
    @listeners_for[name] ||=
      @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
  end
end
listening?(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 78
def listening?(name)
  listeners_for(name).any?
end
publish(name, *args) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 65
def publish(name, *args)
  listeners_for(name).each { |s| s.publish(name, *args) }
end
start(name, id, payload) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 57
def start(name, id, payload)
  listeners_for(name).each { |s| s.start(name, id, payload) }
end
subscribe(pattern = nil, callable = nil, &block) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 23
def subscribe(pattern = nil, callable = nil, &block)
  subscriber = Subscribers.new(pattern, callable || block)
  synchronize do
    if String === pattern
      @string_subscribers[pattern] << subscriber
      @listeners_for.delete(pattern)
    else
      @other_subscribers << subscriber
      @listeners_for.clear
    end
  end
  subscriber
end
unsubscribe(subscriber_or_name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 37
def unsubscribe(subscriber_or_name)
  synchronize do
    case subscriber_or_name
    when String
      @string_subscribers[subscriber_or_name].clear
      @listeners_for.delete(subscriber_or_name)
      @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
    else
      pattern = subscriber_or_name.try(:pattern)
      if String === pattern
        @string_subscribers[pattern].delete(subscriber_or_name)
        @listeners_for.delete(pattern)
      else
        @other_subscribers.delete(subscriber_or_name)
        @listeners_for.clear
      end
    end
  end
end
wait() click to toggle source

This is a sync queue, so there is no waiting.

# File lib/active_support/notifications/fanout.rb, line 83
def wait
end