Module: ActionSubscriber::RabbitConnection

Defined in:
lib/action_subscriber/rabbit_connection.rb

Constant Summary collapse

SUBSCRIBER_CONNECTION_MUTEX =
::Mutex.new
NETWORK_RECOVERY_INTERVAL =
1.freeze

Class Method Summary collapse

Class Method Details

.connection_threadpoolsObject



8
9
10
11
12
13
14
15
16
# File 'lib/action_subscriber/rabbit_connection.rb', line 8

def self.connection_threadpools
  if ::RUBY_PLATFORM == "java"
    subscriber_connections.each_with_object({}) do |(name, connection), hash|
      hash[name] = connection.instance_variable_get("@executor")
    end
  else
    [] # TODO can I get a hold of the thredpool that bunny uses?
  end
end

.setup_connection(name, settings) ⇒ Object



18
19
20
21
22
23
# File 'lib/action_subscriber/rabbit_connection.rb', line 18

def self.setup_connection(name, settings)
  SUBSCRIBER_CONNECTION_MUTEX.synchronize do
    fail ArgumentError, "a #{name} connection already exists" if subscriber_connections[name]
    subscriber_connections[name] = create_connection(settings)
  end
end

.subscriber_connected?Boolean

Returns:

  • (Boolean)


25
26
27
28
29
# File 'lib/action_subscriber/rabbit_connection.rb', line 25

def self.subscriber_connected?
  SUBSCRIBER_CONNECTION_MUTEX.synchronize do
    subscriber_connections.all?{|_name, connection| connection.connected?}
  end
end

.subscriber_disconnect!Object



31
32
33
34
35
36
# File 'lib/action_subscriber/rabbit_connection.rb', line 31

def self.subscriber_disconnect!
  SUBSCRIBER_CONNECTION_MUTEX.synchronize do
    subscriber_connections.each{|_name, connection| connection.close}
    @subscriber_connections = {}
  end
end

.with_connection(name) ⇒ Object



38
39
40
41
42
43
# File 'lib/action_subscriber/rabbit_connection.rb', line 38

def self.with_connection(name)
  SUBSCRIBER_CONNECTION_MUTEX.synchronize do
    fail ArgumentError, "there is no connection named #{name}" unless subscriber_connections[name]
    yield(subscriber_connections[name])
  end
end