Class: Hotot::SynchronousConnection

Inherits:
Object
  • Object
show all
Extended by:
ClassAttributeAccessors, Configurable
Defined in:
lib/hotot/synchronous_connection.rb

Constant Summary collapse

@@setup =
false
@@connected =
false

Class Method Summary collapse

Methods included from ClassAttributeAccessors

cattr_reader

Class Method Details

.connectObject

Establish a connection to the underlying exchange

Raises:

  • (StandardError)


46
47
48
49
50
51
52
53
54
# File 'lib/hotot/synchronous_connection.rb', line 46

def self.connect
  raise StandardError, "AMQP not setup. Call setup before calling connect" if !self.setup?
  @@producer.start
  @@consumer.start
  
  # use defualt channel for exchange
  @@exchange = @@producer.exchange(@@env_config[:exchange], :type => :topic, :durable => true)
  @@connected = true
end

.connected?Boolean

Whether the underlying connection has been established

Returns:

  • (Boolean)


80
81
82
# File 'lib/hotot/synchronous_connection.rb', line 80

def self.connected?
  @@connected
end

.disconnectObject

Disconnect from the underlying exchange



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/hotot/synchronous_connection.rb', line 57

def self.disconnect
  begin
    @@producer.stop
    @@consumer.stop
  rescue
    # if this is being called because the underlying connection went bad
    # calling stop will raise an error. that's ok....
  ensure
    @@connected = false
  end
end

.produce(message) ⇒ Object

Produces a message to the underlying exchange



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/hotot/synchronous_connection.rb', line 85

def self.produce(message)
  if !self.setup? || !self.connected?
    Hotot.logger.error "AMQP not setup or connected. Call setup and connect before calling produce"
  else
    begin
      @@exchange.publish(message.to_json, :routing_key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
    rescue Bunny::ServerDownError
      # the connection went south, try to reconnect and try one more time
      begin
        self.reconnect
        @@exchange.publish(message.to_json, :routing_key => message.routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json")
      rescue => err
        Hotot.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
        Hotot.logger.error "#{err.message}"
        Hotot.logger.error err.backtrace.join("\n")
      end
    rescue => err
      Hotot.logger.error "Unexpected error producing AMQP messsage: (#{message.to_json})"
      Hotot.logger.error "#{err.message}"
      Hotot.logger.error err.backtrace.join("\n")
    end
  end
end

.reconnectObject

Re-connects to the underlying exchange



70
71
72
73
74
75
76
77
# File 'lib/hotot/synchronous_connection.rb', line 70

def self.reconnect
  self.disconnect
  @@setup = false
  @@producer = Bunny.new(@@env_config)
  @@consumer = Bunny.new(@@env_config)
  @@setup = true
  self.connect
end

.setup(config = nil, env = (::Rails) ? Rails.env : nil)) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/hotot/synchronous_connection.rb', line 15

def self.setup(config = nil, env = (defined?(::Rails) ? Rails.env : nil) )
  if !self.setup?
    @@config = load_config(config)
    @@env_config = @@config[env]
    raise StandardError, "Env #{env} not found in config" if @@env_config.nil?
    
    # symbolize the keys, which Bunny expects
    @@env_config.keys.each {|key| @@env_config[(key.to_sym rescue key) || key] = @@env_config.delete(key) }
    raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?(:exchange)
    
    @@producer = Bunny.new(@@env_config)
    @@consumer = Bunny.new(@@env_config)
    @@setup = true
  end
end

.setup?Boolean

Whether the underlying connection has been set up

Returns:

  • (Boolean)


32
33
34
# File 'lib/hotot/synchronous_connection.rb', line 32

def self.setup?
  @@setup
end

.subscribe(routing_key, &block) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/hotot/synchronous_connection.rb', line 36

def self.subscribe(routing_key, &block)
  q = @@consumer.queue.bind(@@env_config[:exchange], :routing_key => routing_key)

  q.subscribe do |delivery_info, , payload|
    block.call delivery_info, , payload
  end
  
end