class ActiveSupport::Notifications::Fanout
This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.
This class is thread safe. All methods are reentrant.
Public Class Methods
Calls superclass method
# File activesupport/lib/active_support/notifications/fanout.rb, line 54 def initialize @string_subscribers = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { [] } } @other_subscribers = [] @all_listeners_for = Concurrent::Map.new @groups_for = Concurrent::Map.new @silenceable_groups_for = Concurrent::Map.new super end
Public Instance Methods
# File activesupport/lib/active_support/notifications/fanout.rb, line 301 def all_listeners_for(name) # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics) @all_listeners_for[name] || synchronize do # use synchronisation when accessing @subscribers @all_listeners_for[name] ||= @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) } end end
# File activesupport/lib/active_support/notifications/fanout.rb, line 276 def build_handle(name, id, payload) Handle.new(self, name, id, payload) end
# File activesupport/lib/active_support/notifications/fanout.rb, line 287 def finish(name, id, payload, listeners = nil) handle_stack = IsolatedExecutionState[:_fanout_handle_stack] handle = handle_stack.pop handle.finish_with_values(name, id, payload) end
# File activesupport/lib/active_support/notifications/fanout.rb, line 310 def listeners_for(name) all_listeners_for(name).reject { |s| s.silenced?(name) } end
# File activesupport/lib/active_support/notifications/fanout.rb, line 314 def listening?(name) all_listeners_for(name).any? { |s| !s.silenced?(name) } end
# File activesupport/lib/active_support/notifications/fanout.rb, line 293 def publish(name, *args) iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) } end
# File activesupport/lib/active_support/notifications/fanout.rb, line 297 def publish_event(event) iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) } end
# File activesupport/lib/active_support/notifications/fanout.rb, line 280 def start(name, id, payload) handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= []) handle = build_handle(name, id, payload) handle_stack << handle handle.start end
# File activesupport/lib/active_support/notifications/fanout.rb, line 68 def subscribe(pattern = nil, callable = nil, monotonic: false, &block) subscriber = Subscribers.new(pattern, callable || block, monotonic) synchronize do case pattern when String @string_subscribers[pattern] << subscriber clear_cache(pattern) when NilClass, Regexp @other_subscribers << subscriber clear_cache else raise ArgumentError, "pattern must be specified as a String, Regexp or empty" end end subscriber end
# File activesupport/lib/active_support/notifications/fanout.rb, line 85 def unsubscribe(subscriber_or_name) synchronize do case subscriber_or_name when String @string_subscribers[subscriber_or_name].clear clear_cache(subscriber_or_name) @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) } else pattern = subscriber_or_name.try(:pattern) if String === pattern @string_subscribers[pattern].delete(subscriber_or_name) clear_cache(pattern) else @other_subscribers.delete(subscriber_or_name) clear_cache end end end end
This is a sync queue, so there is no waiting.
# File activesupport/lib/active_support/notifications/fanout.rb, line 319 def wait end