Class: Lolitra::RabbitmqBus
- Inherits:
-
Object
- Object
- Lolitra::RabbitmqBus
- Defined in:
- lib/lolitra/rabbitmq_bus.rb
Constant Summary collapse
- SUBSCRIBE_OPTIONS =
{:durable => true}
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#exchange ⇒ Object
Returns the value of attribute exchange.
-
#exchange_dead_letter ⇒ Object
Returns the value of attribute exchange_dead_letter.
-
#options ⇒ Object
Returns the value of attribute options.
-
#subscribers ⇒ Object
Returns the value of attribute subscribers.
Instance Method Summary collapse
- #disconnect(&block) ⇒ Object
-
#initialize(hash = {}) ⇒ RabbitmqBus
constructor
A new instance of RabbitmqBus.
- #process_deadletters(handler_class) ⇒ Object
- #publish(message) ⇒ Object
- #purge_deadletters(handler_class) ⇒ Object
- #remove_next_deadletter(handler_class) ⇒ Object
- #subscribe(message_class, handler_class) ⇒ Object
- #unsubscribe(handler_class, &block) ⇒ Object
Constructor Details
#initialize(hash = {}) ⇒ RabbitmqBus
Returns a new instance of RabbitmqBus.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 11 def initialize(hash = {}) Lolitra::MessageHandlerManager.bus = self self. = { :queue_prefix => "", :queue_suffix => "", :exchange_dead_suffix => ".dead", :exchange_dead_params => {}, :queue_params => {}, :queue_dead_suffix => ".dead", :queue_dead_params => {}, :no_consume => false, }.merge(hash.delete(:options) || {}) self.[:queue_params][:arguments] = {} unless self.[:queue_params][:arguments] self.[:queue_params][:arguments] = { "x-dead-letter-exchange" => "#{hash[:exchange]}#{@options[:exchange_dead_suffix]}" }.merge(self.[:queue_params][:arguments]) @channels = {} @params = hash.reject { |key, value| !value } raise "no :exchange specified" unless hash[:exchange] AMQP::Utilities::EventLoopHelper.run do self.connection = AMQP.start(@params) do |connection| Lolitra::logger.info("Connected to rabbitmq.") channel = create_channel(connection) do |channel| begin self.exchange = channel.topic(@params[:exchange], :durable => true) self.exchange_dead_letter = channel.topic("#{@params[:exchange]}#{@options[:exchange_dead_suffix]}", :durable => true) @params[:subscribers].each do |handler| Lolitra::MessageHandlerManager.register_subscriber(handler) end rescue => e Lolitra::log_exception(e) end end end self.connection.on_tcp_connection_loss do |connection, settings| # reconnect in 10 seconds, without enforcement Lolitra::logger.info("Connection loss. Trying to reconnect in 10 secs if needed.") connection.reconnect(false, 10) end end end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
3 4 5 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 3 def connection @connection end |
#exchange ⇒ Object
Returns the value of attribute exchange.
4 5 6 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 4 def exchange @exchange end |
#exchange_dead_letter ⇒ Object
Returns the value of attribute exchange_dead_letter.
5 6 7 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 5 def exchange_dead_letter @exchange_dead_letter end |
#options ⇒ Object
Returns the value of attribute options.
6 7 8 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 6 def @options end |
#subscribers ⇒ Object
Returns the value of attribute subscribers.
7 8 9 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 7 def subscribers @subscribers end |
Instance Method Details
#disconnect(&block) ⇒ Object
59 60 61 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 59 def disconnect(&block) self.connection.close(&block) end |
#process_deadletters(handler_class) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 92 def process_deadletters(handler_class) queue_name_dead = generate_queue_name_dead(handler_class) = SUBSCRIBE_OPTIONS create_channel(self.connection) do |channel| begin channel.queue(queue_name_dead, .merge(@options[:queue_dead_params])) do |queue| recursive_pop(channel, queue, handler_class) end rescue => e Lolitra::log_exception(e) end end true end |
#publish(message) ⇒ Object
67 68 69 70 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 67 def publish() #TODO: if exchange channel is closed doesn't log anything self.exchange.publish(.marshall, :routing_key => .class., :timestamp => Time.now.to_i) end |
#purge_deadletters(handler_class) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 107 def purge_deadletters(handler_class) queue_name_dead = generate_queue_name_dead(handler_class) = SUBSCRIBE_OPTIONS create_channel(self.connection) do |channel| begin channel.queue(queue_name_dead, .merge(@options[:queue_dead_params])) do |queue| purge_queue(queue) end rescue => e Lolitra::log_exception(e) end end true end |
#remove_next_deadletter(handler_class) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 122 def remove_next_deadletter(handler_class) queue_name_dead = generate_queue_name_dead(handler_class) = SUBSCRIBE_OPTIONS create_channel(self.connection) do |channel| begin channel.queue(queue_name_dead, .merge(@options[:queue_dead_params])) do |queue| queue.pop end rescue => e Lolitra::log_exception(e) end end true end |
#subscribe(message_class, handler_class) ⇒ Object
63 64 65 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 63 def subscribe(, handler_class) create_queue(, handler_class, SUBSCRIBE_OPTIONS) end |
#unsubscribe(handler_class, &block) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/lolitra/rabbitmq_bus.rb', line 72 def unsubscribe(handler_class, &block) queue_name = generate_queue_name(handler_class) begin create_channel(self.connection) do |channel| queue = channel.queue(queue_name, SUBSCRIBE_OPTIONS) do |queue| begin queue.delete do block.call(handler_class, true) end rescue => e Lolitra::log_exception(e) block.call(handler_class, false) end end end rescue => e Lolitra::log_exception(e) end end |