class ActiveSupport::Notifications::Fanout
This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.
This class is thread safe. All methods are reentrant.
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 14 def initialize @subscribers = [] @listeners_for = Concurrent::Map.new super end
Public Instance Methods
finish(name, id, payload, listeners = listeners_for(name))
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 46 def finish(name, id, payload, listeners = listeners_for(name)) listeners.each { |s| s.finish(name, id, payload) } end
listeners_for(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 54 def listeners_for(name) # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics) @listeners_for[name] || synchronize do # use synchronisation when accessing @subscribers @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } end end
listening?(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 62 def listening?(name) listeners_for(name).any? end
publish(name, *args)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 50 def publish(name, *args) listeners_for(name).each { |s| s.publish(name, *args) } end
start(name, id, payload)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 42 def start(name, id, payload) listeners_for(name).each { |s| s.start(name, id, payload) } end
subscribe(pattern = nil, block = Proc.new)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 20 def subscribe(pattern = nil, block = Proc.new) subscriber = Subscribers.new pattern, block synchronize do @subscribers << subscriber @listeners_for.clear end subscriber end
unsubscribe(subscriber_or_name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 29 def unsubscribe(subscriber_or_name) synchronize do case subscriber_or_name when String @subscribers.reject! { |s| s.matches?(subscriber_or_name) } else @subscribers.delete(subscriber_or_name) end @listeners_for.clear end end
wait()
click to toggle source
This is a sync queue, so there is no waiting.
# File lib/active_support/notifications/fanout.rb, line 67 def wait end