Class: Lolitra::RabbitmqBus

Inherits:
Object
  • Object
show all
Defined in:
lib/lolitra/rabbitmq_bus.rb

Constant Summary collapse

SUBSCRIBE_OPTIONS =
{:durable => true}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(hash = {}) ⇒ RabbitmqBus

Returns a new instance of RabbitmqBus.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/lolitra/rabbitmq_bus.rb', line 11

def initialize(hash = {})
  Lolitra::MessageHandlerManager.bus = self

  self.options = {
    :queue_prefix => "",
    :queue_suffix => "",
    :exchange_dead_suffix => ".dead",
    :exchange_dead_params => {}, 
    :queue_params => {},
    :queue_dead_suffix => ".dead",
    :queue_dead_params => {},
    :no_consume => false,
  }.merge(hash.delete(:options) || {})

  self.options[:queue_params][:arguments] = {} unless self.options[:queue_params][:arguments]

  self.options[:queue_params][:arguments] = {
    "x-dead-letter-exchange" => "#{hash[:exchange]}#{@options[:exchange_dead_suffix]}"
  }.merge(self.options[:queue_params][:arguments])

  @channels = {}
  @params = hash.reject { |key, value| !value }
  raise "no :exchange specified" unless hash[:exchange]

  AMQP::Utilities::EventLoopHelper.run do
    self.connection = AMQP.start(@params) do |connection|
      Lolitra::logger.info("Connected to rabbitmq.")
      channel = create_channel(connection) do |channel|
        begin
          self.exchange = channel.topic(@params[:exchange], :durable => true)
          self.exchange_dead_letter = channel.topic("#{@params[:exchange]}#{@options[:exchange_dead_suffix]}", :durable => true)

          @params[:subscribers].each do |handler|
            Lolitra::MessageHandlerManager.register_subscriber(handler)
          end
        rescue => e
          Lolitra::log_exception(e)
        end
      end
    end
    self.connection.on_tcp_connection_loss do |connection, settings|
      # reconnect in 10 seconds, without enforcement
      Lolitra::logger.info("Connection loss. Trying to reconnect in 10 secs if needed.")
      connection.reconnect(false, 10)
    end
  end
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



3
4
5
# File 'lib/lolitra/rabbitmq_bus.rb', line 3

def connection
  @connection
end

#exchangeObject

Returns the value of attribute exchange.



4
5
6
# File 'lib/lolitra/rabbitmq_bus.rb', line 4

def exchange
  @exchange
end

#exchange_dead_letterObject

Returns the value of attribute exchange_dead_letter.



5
6
7
# File 'lib/lolitra/rabbitmq_bus.rb', line 5

def exchange_dead_letter
  @exchange_dead_letter
end

#optionsObject

Returns the value of attribute options.



6
7
8
# File 'lib/lolitra/rabbitmq_bus.rb', line 6

def options
  @options
end

#subscribersObject

Returns the value of attribute subscribers.



7
8
9
# File 'lib/lolitra/rabbitmq_bus.rb', line 7

def subscribers
  @subscribers
end

Instance Method Details

#disconnect(&block) ⇒ Object



59
60
61
# File 'lib/lolitra/rabbitmq_bus.rb', line 59

def disconnect(&block)
  self.connection.close(&block)
end

#process_deadletters(handler_class) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/lolitra/rabbitmq_bus.rb', line 92

def process_deadletters(handler_class)
  queue_name_dead = generate_queue_name_dead(handler_class)
  options = SUBSCRIBE_OPTIONS 
  create_channel(self.connection) do |channel|
    begin
      channel.queue(queue_name_dead, options.merge(@options[:queue_dead_params])) do |queue|
        recursive_pop(channel, queue, handler_class)
      end
    rescue => e
      Lolitra::log_exception(e)
    end
  end
  true
end

#publish(message) ⇒ Object



67
68
69
70
# File 'lib/lolitra/rabbitmq_bus.rb', line 67

def publish(message)
  #TODO: if exchange channel is closed doesn't log anything
  self.exchange.publish(message.marshall, :routing_key => message.class.message_key, :timestamp => Time.now.to_i)
end

#purge_deadletters(handler_class) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/lolitra/rabbitmq_bus.rb', line 107

def purge_deadletters(handler_class)
  queue_name_dead = generate_queue_name_dead(handler_class)
  options = SUBSCRIBE_OPTIONS 
  create_channel(self.connection) do |channel|
    begin
      channel.queue(queue_name_dead, options.merge(@options[:queue_dead_params])) do |queue|
        purge_queue(queue)
      end
    rescue => e
      Lolitra::log_exception(e)
    end
  end
  true
end

#remove_next_deadletter(handler_class) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/lolitra/rabbitmq_bus.rb', line 122

def remove_next_deadletter(handler_class)
  queue_name_dead = generate_queue_name_dead(handler_class)
  options = SUBSCRIBE_OPTIONS 
  create_channel(self.connection) do |channel|
    begin
      channel.queue(queue_name_dead, options.merge(@options[:queue_dead_params])) do |queue|
        queue.pop
      end
    rescue => e
      Lolitra::log_exception(e)
    end
  end
  true
end

#subscribe(message_class, handler_class) ⇒ Object



63
64
65
# File 'lib/lolitra/rabbitmq_bus.rb', line 63

def subscribe(message_class, handler_class)
  create_queue(message_class, handler_class, SUBSCRIBE_OPTIONS)
end

#unsubscribe(handler_class, &block) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/lolitra/rabbitmq_bus.rb', line 72

def unsubscribe(handler_class, &block)
  queue_name = generate_queue_name(handler_class)
  begin
    create_channel(self.connection) do |channel|
      queue = channel.queue(queue_name, SUBSCRIBE_OPTIONS) do |queue|
        begin
          queue.delete do
            block.call(handler_class, true)
          end
        rescue => e
          Lolitra::log_exception(e)
          block.call(handler_class, false)
        end
      end
    end
  rescue => e
    Lolitra::log_exception(e)
  end
end