Class: Hotot::SynchronousConnection
- Inherits:
-
Object
- Object
- Hotot::SynchronousConnection
- Extended by:
- ClassAttributeAccessors, Configurable
- Defined in:
- lib/hotot/synchronous_connection.rb
Constant Summary collapse
- @@setup =
false- @@connected =
false
Class Method Summary collapse
-
.connect ⇒ Object
Establish a connection to the underlying exchange.
-
.connected? ⇒ Boolean
Whether the underlying connection has been established.
-
.disconnect ⇒ Object
Disconnect from the underlying exchange.
-
.produce(message) ⇒ Object
Produces a message to the underlying exchange.
-
.reconnect ⇒ Object
Re-connects to the underlying exchange.
- .setup(config = nil, env = (::Rails) ? Rails.env : nil)) ⇒ Object
-
.setup? ⇒ Boolean
Whether the underlying connection has been set up.
- .subscribe(routing_key, &block) ⇒ Object
Methods included from ClassAttributeAccessors
Class Method Details
.connect ⇒ Object
Establish a connection to the underlying exchange
46 47 48 49 50 51 52 53 54 |
# File 'lib/hotot/synchronous_connection.rb', line 46 def self.connect raise StandardError, "AMQP not setup. Call setup before calling connect" if !self.setup? @@producer.start @@consumer.start # use defualt channel for exchange @@exchange = @@producer.exchange(@@env_config[:exchange], :type => :topic, :durable => true) @@connected = true end |
.connected? ⇒ Boolean
Whether the underlying connection has been established
80 81 82 |
# File 'lib/hotot/synchronous_connection.rb', line 80 def self.connected? @@connected end |
.disconnect ⇒ Object
Disconnect from the underlying exchange
57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/hotot/synchronous_connection.rb', line 57 def self.disconnect begin @@producer.stop @@consumer.stop rescue # if this is being called because the underlying connection went bad # calling stop will raise an error. that's ok.... ensure @@connected = false end end |
.produce(message) ⇒ Object
Produces a message to the underlying exchange
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/hotot/synchronous_connection.rb', line 85 def self.produce() if !self.setup? || !self.connected? Hotot.logger.error "AMQP not setup or connected. Call setup and connect before calling produce" else begin @@exchange.publish(.to_json, :routing_key => .routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json") rescue Bunny::ServerDownError # the connection went south, try to reconnect and try one more time begin self.reconnect @@exchange.publish(.to_json, :routing_key => .routing_key, :mandatory => false, :immediate => false, :persistent => true, :content_type => "application/json") rescue => err Hotot.logger.error "Unexpected error producing AMQP messsage: (#{.to_json})" Hotot.logger.error "#{err.}" Hotot.logger.error err.backtrace.join("\n") end rescue => err Hotot.logger.error "Unexpected error producing AMQP messsage: (#{.to_json})" Hotot.logger.error "#{err.}" Hotot.logger.error err.backtrace.join("\n") end end end |
.reconnect ⇒ Object
Re-connects to the underlying exchange
70 71 72 73 74 75 76 77 |
# File 'lib/hotot/synchronous_connection.rb', line 70 def self.reconnect self.disconnect @@setup = false @@producer = Bunny.new(@@env_config) @@consumer = Bunny.new(@@env_config) @@setup = true self.connect end |
.setup(config = nil, env = (::Rails) ? Rails.env : nil)) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/hotot/synchronous_connection.rb', line 15 def self.setup(config = nil, env = (defined?(::Rails) ? Rails.env : nil) ) if !self.setup? @@config = load_config(config) @@env_config = @@config[env] raise StandardError, "Env #{env} not found in config" if @@env_config.nil? # symbolize the keys, which Bunny expects @@env_config.keys.each {|key| @@env_config[(key.to_sym rescue key) || key] = @@env_config.delete(key) } raise StandardError, "'exchange' key not found in config" if !@@env_config.has_key?(:exchange) @@producer = Bunny.new(@@env_config) @@consumer = Bunny.new(@@env_config) @@setup = true end end |
.setup? ⇒ Boolean
Whether the underlying connection has been set up
32 33 34 |
# File 'lib/hotot/synchronous_connection.rb', line 32 def self.setup? @@setup end |
.subscribe(routing_key, &block) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/hotot/synchronous_connection.rb', line 36 def self.subscribe(routing_key, &block) q = @@consumer.queue.bind(@@env_config[:exchange], :routing_key => routing_key) q.subscribe do |delivery_info, , payload| block.call delivery_info, , payload end end |