class ActiveSupport::Notifications::Fanout

This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Public Class Methods

Calls superclass method
# File activesupport/lib/active_support/notifications/fanout.rb, line 54
def initialize
  @string_subscribers = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { [] } }
  @other_subscribers = []
  @all_listeners_for = Concurrent::Map.new
  @groups_for = Concurrent::Map.new
  @silenceable_groups_for = Concurrent::Map.new
  super
end

Public Instance Methods

# File activesupport/lib/active_support/notifications/fanout.rb, line 301
def all_listeners_for(name)
  # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
  @all_listeners_for[name] || synchronize do
    # use synchronisation when accessing @subscribers
    @all_listeners_for[name] ||=
      @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
  end
end
# File activesupport/lib/active_support/notifications/fanout.rb, line 276
def build_handle(name, id, payload)
  Handle.new(self, name, id, payload)
end
# File activesupport/lib/active_support/notifications/fanout.rb, line 287
def finish(name, id, payload, listeners = nil)
  handle_stack = IsolatedExecutionState[:_fanout_handle_stack]
  handle = handle_stack.pop
  handle.finish_with_values(name, id, payload)
end
# File activesupport/lib/active_support/notifications/fanout.rb, line 310
def listeners_for(name)
  all_listeners_for(name).reject { |s| s.silenced?(name) }
end
# File activesupport/lib/active_support/notifications/fanout.rb, line 314
def listening?(name)
  all_listeners_for(name).any? { |s| !s.silenced?(name) }
end
# File activesupport/lib/active_support/notifications/fanout.rb, line 293
def publish(name, *args)
  iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
end
# File activesupport/lib/active_support/notifications/fanout.rb, line 297
def publish_event(event)
  iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end
# File activesupport/lib/active_support/notifications/fanout.rb, line 280
def start(name, id, payload)
  handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= [])
  handle = build_handle(name, id, payload)
  handle_stack << handle
  handle.start
end
# File activesupport/lib/active_support/notifications/fanout.rb, line 68
def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
  subscriber = Subscribers.new(pattern, callable || block, monotonic)
  synchronize do
    case pattern
    when String
      @string_subscribers[pattern] << subscriber
      clear_cache(pattern)
    when NilClass, Regexp
      @other_subscribers << subscriber
      clear_cache
    else
      raise ArgumentError,  "pattern must be specified as a String, Regexp or empty"
    end
  end
  subscriber
end
# File activesupport/lib/active_support/notifications/fanout.rb, line 85
def unsubscribe(subscriber_or_name)
  synchronize do
    case subscriber_or_name
    when String
      @string_subscribers[subscriber_or_name].clear
      clear_cache(subscriber_or_name)
      @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
    else
      pattern = subscriber_or_name.try(:pattern)
      if String === pattern
        @string_subscribers[pattern].delete(subscriber_or_name)
        clear_cache(pattern)
      else
        @other_subscribers.delete(subscriber_or_name)
        clear_cache
      end
    end
  end
end

This is a sync queue, so there is no waiting.

# File activesupport/lib/active_support/notifications/fanout.rb, line 319
def wait
end