class ActiveSupport::Notifications::Fanout
This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.
This class is thread safe. All methods are reentrant.
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 15 def initialize @string_subscribers = Hash.new { |h, k| h[k] = [] } @other_subscribers = [] @listeners_for = Concurrent::Map.new super end
Public Instance Methods
finish(name, id, payload, listeners = listeners_for(name))
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 60 def finish(name, id, payload, listeners = listeners_for(name)) listeners.each { |s| s.finish(name, id, payload) } end
listeners_for(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 68 def listeners_for(name) # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics) @listeners_for[name] || synchronize do # use synchronisation when accessing @subscribers @listeners_for[name] ||= @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) } end end
listening?(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 77 def listening?(name) listeners_for(name).any? end
publish(name, *args)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 64 def publish(name, *args) listeners_for(name).each { |s| s.publish(name, *args) } end
start(name, id, payload)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 56 def start(name, id, payload) listeners_for(name).each { |s| s.start(name, id, payload) } end
subscribe(pattern = nil, callable = nil, &block)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 22 def subscribe(pattern = nil, callable = nil, &block) subscriber = Subscribers.new(pattern, callable || block) synchronize do if String === pattern @string_subscribers[pattern] << subscriber @listeners_for.delete(pattern) else @other_subscribers << subscriber @listeners_for.clear end end subscriber end
unsubscribe(subscriber_or_name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 36 def unsubscribe(subscriber_or_name) synchronize do case subscriber_or_name when String @string_subscribers[subscriber_or_name].clear @listeners_for.delete(subscriber_or_name) @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) } else pattern = subscriber_or_name.try(:pattern) if String === pattern @string_subscribers[pattern].delete(subscriber_or_name) @listeners_for.delete(pattern) else @other_subscribers.delete(subscriber_or_name) @listeners_for.clear end end end end
wait()
click to toggle source
This is a sync queue, so there is no waiting.
# File lib/active_support/notifications/fanout.rb, line 82 def wait end