Class: Smith::Messaging::Receiver

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/smith/messaging/receiver.rb

Instance Method Summary collapse

Methods included from Util

#number_of_consumers, #number_of_messages

Methods included from Logger

included

Constructor Details

#initialize(queue_def, opts = {}, &blk) ⇒ Receiver

Returns a new instance of Receiver.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/smith/messaging/receiver.rb', line 13

def initialize(queue_def, opts={}, &blk)

  # This is for backward compatibility.
  @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts)

  @acl_type_cache = AclTypeCache.instance

  @foo_options = {
    :auto_ack => option_or_default(@queue_def.options, :auto_ack, true),
    :threading => option_or_default(@queue_def.options, :threading, false)}

  fanout_options = if opts.delete(:fanout)
    {:persistence => opts.delete(:fanout_persistence) { true }, :queue_suffix => opts.delete(:fanout_queue_suffix)}
  end

  @payload_type = Array(option_or_default(@queue_def.options, :type, []))

  prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch)

  @options = AmqpOptions.new(@queue_def.options)
  @options.routing_key = @queue_def.normalise

  @message_counter = MessageCounter.new(@queue_def.denormalise)

  @channel_completion = EM::Completion.new
  @queue_completion = EM::Completion.new
  @exchange_completion = EM::Completion.new
  @requeue_options_completion = EM::Completion.new

  @reply_queues = {}

  open_channel(:prefetch => prefetch) do |channel|
    @channel_completion.succeed(channel)
    exchange_type = (fanout_options) ? :fanout : :direct
    channel.send(exchange_type, @queue_def.normalise, @options.exchange) do |exchange|
      @exchange_completion.succeed(exchange)
    end
  end

  open_channel(:prefetch => prefetch) do |channel|
    channel.queue(*fanout_queue_and_opts(fanout_options)) do |queue|
      @exchange_completion.completion do |exchange|
        queue.bind(exchange, :routing_key => @queue_def.normalise)
        @queue_completion.succeed(queue)
        @requeue_options_completion.succeed(:exchange => exchange, :queue => queue)
      end
    end
  end

  blk.call(self) if blk
end

Instance Method Details

#ack(multiple = false) ⇒ Object



65
66
67
# File 'lib/smith/messaging/receiver.rb', line 65

def ack(multiple=false)
  @channel_completion.completion {|channel| channel.ack(multiple) }
end

#delete(&blk) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/smith/messaging/receiver.rb', line 167

def delete(&blk)
  @exchange_completion.completion do |exchange|
    @queue_completion.completion do |queue|
      @channel_completion.completion do |channel|
        queue.unbind(exchange) do
          queue.delete do
            exchange.delete do
              channel.close(&blk)
            end
          end
        end
      end
    end
  end
end

#on_error(chain = false, &blk) ⇒ Object

Define a channel error handler.



137
138
139
140
141
142
# File 'lib/smith/messaging/receiver.rb', line 137

def on_error(chain=false, &blk)
  # TODO Check that this chains callbacks
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end

#on_requeue(&blk) ⇒ Object



197
198
199
200
201
# File 'lib/smith/messaging/receiver.rb', line 197

def on_requeue(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue => blk)
  end
end

#on_requeue_limit(&blk) ⇒ Object



203
204
205
206
207
# File 'lib/smith/messaging/receiver.rb', line 203

def on_requeue_limit(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue_limit => blk)
  end
end

#pop(&blk) ⇒ Object

pops a message off the queue and passes the headers and payload into the block. pop will automatically acknowledge the message unless the options sets :ack to false.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/smith/messaging/receiver.rb', line 121

def pop(&blk)
  opts = @options.pop
  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      queue.pop(opts) do |, payload|
        if payload
          on_message(, payload, requeue_options, &blk)
        else
          blk.call(nil,nil)
        end
      end
    end
  end
end

#queue_nameObject



163
164
165
# File 'lib/smith/messaging/receiver.rb', line 163

def queue_name
  @queue_def.denormalise
end

#requeue_parameters(opts = {}) ⇒ Object



191
192
193
194
195
# File 'lib/smith/messaging/receiver.rb', line 191

def requeue_parameters(opts={})
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(opts)
  end
end

#setup_reply_queue(reply_queue_name, &blk) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/smith/messaging/receiver.rb', line 69

def setup_reply_queue(reply_queue_name, &blk)
  if @reply_queues[reply_queue_name]
    blk.call(@reply_queues[reply_queue_name])
  else
    @exchange_completion.completion do |exchange|
      logger.debug { "Attaching to reply queue: #{reply_queue_name}" }

      reply_queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false)

      Smith::Messaging::Sender.new(reply_queue_def) do |sender|
        @reply_queues[reply_queue_name] = sender
        blk.call(sender)
      end
    end
  end
end

#status(&blk) ⇒ Object



183
184
185
186
187
188
189
# File 'lib/smith/messaging/receiver.rb', line 183

def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end

#subscribe(handler = nil, &blk) ⇒ Object

Subscribes to a queue and passes the headers and payload into the block. subscribe will automatically acknowledge the message unless the options sets :ack to false.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/smith/messaging/receiver.rb', line 89

def subscribe(handler=nil, &blk)

  blk = handler || blk

  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      if !queue.subscribed?
        opts = @options.subscribe
        logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" }
        queue.subscribe(opts) do |,payload|
          if payload
            on_message(, payload, requeue_options, &blk)
          else
            logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" }
          end
        end
      else
        logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" }
      end
    end
  end
end

#unsubscribe(&blk) ⇒ Object



112
113
114
115
116
# File 'lib/smith/messaging/receiver.rb', line 112

def unsubscribe(&blk)
  @queue_completion.completion do |queue|
    queue.unsubscribe(&blk)
  end
end