Class: Smith::Messaging::Receiver

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/smith/messaging/receiver.rb

Instance Method Summary collapse

Methods included from Util

#number_of_consumers, #number_of_messages

Methods included from Logger

included

Constructor Details

#initialize(queue_def, opts = {}, &blk) ⇒ Receiver

Returns a new instance of Receiver.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/smith/messaging/receiver.rb', line 13

def initialize(queue_def, opts={}, &blk)

  # This is for backward compatibility.
  @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts)

  @acl_type_cache = AclTypeCache.instance

  @foo_options = {
    :auto_ack => option_or_default(@queue_def.options, :auto_ack, true),
    :threading => option_or_default(@queue_def.options, :threading, false)}

  @payload_type = Array(option_or_default(@queue_def.options, :type, []))

  prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch)

  @options = AmqpOptions.new(@queue_def.options)
  @options.routing_key = @queue_def.normalise

  @message_counter = MessageCounter.new(@queue_def.denormalise)

  @channel_completion = EM::Completion.new
  @queue_completion = EM::Completion.new
  @exchange_completion = EM::Completion.new
  @requeue_options_completion = EM::Completion.new

  @reply_queues = {}

  open_channel(:prefetch => prefetch) do |channel|
    @channel_completion.succeed(channel)
    channel.direct(@queue_def.normalise, @options.exchange) do |exchange|
      @exchange_completion.succeed(exchange)
    end
  end

  open_channel(:prefetch => prefetch) do |channel|
    channel.queue(@queue_def.normalise, @options.queue) do |queue|
      @exchange_completion.completion do |exchange|
        queue.bind(exchange, :routing_key => @queue_def.normalise)
        @queue_completion.succeed(queue)
        @requeue_options_completion.succeed(:exchange => exchange, :queue => queue)
      end
    end
  end

  blk.call(self) if blk
end

Instance Method Details

#ack(multiple = false) ⇒ Object



60
61
62
# File 'lib/smith/messaging/receiver.rb', line 60

def ack(multiple=false)
  @channel_completion.completion {|channel| channel.ack(multiple) }
end

#delete(&blk) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/smith/messaging/receiver.rb', line 162

def delete(&blk)
  @exchange_completion.completion do |exchange|
    @queue_completion.completion do |queue|
      @channel_completion.completion do |channel|
        queue.unbind(exchange) do
          queue.delete do
            exchange.delete do
              channel.close(&blk)
            end
          end
        end
      end
    end
  end
end

#on_error(chain = false, &blk) ⇒ Object

Define a channel error handler.



132
133
134
135
136
137
# File 'lib/smith/messaging/receiver.rb', line 132

def on_error(chain=false, &blk)
  # TODO Check that this chains callbacks
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end

#on_requeue(&blk) ⇒ Object



192
193
194
195
196
# File 'lib/smith/messaging/receiver.rb', line 192

def on_requeue(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue => blk)
  end
end

#on_requeue_limit(&blk) ⇒ Object



198
199
200
201
202
# File 'lib/smith/messaging/receiver.rb', line 198

def on_requeue_limit(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue_limit => blk)
  end
end

#pop(&blk) ⇒ Object

pops a message off the queue and passes the headers and payload into the block. pop will automatically acknowledge the message unless the options sets :ack to false.



116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/smith/messaging/receiver.rb', line 116

def pop(&blk)
  opts = @options.pop
  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      queue.pop(opts) do |, payload|
        if payload
          on_message(, payload, requeue_options, &blk)
        else
          blk.call(nil,nil)
        end
      end
    end
  end
end

#queue_nameObject



158
159
160
# File 'lib/smith/messaging/receiver.rb', line 158

def queue_name
  @queue_def.denormalise
end

#requeue_parameters(opts = {}) ⇒ Object



186
187
188
189
190
# File 'lib/smith/messaging/receiver.rb', line 186

def requeue_parameters(opts={})
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(opts)
  end
end

#setup_reply_queue(reply_queue_name, &blk) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/smith/messaging/receiver.rb', line 64

def setup_reply_queue(reply_queue_name, &blk)
  if @reply_queues[reply_queue_name]
    blk.call(@reply_queues[reply_queue_name])
  else
    @exchange_completion.completion do |exchange|
      logger.debug { "Attaching to reply queue: #{reply_queue_name}" }

      queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false)

      Smith::Messaging::Sender.new(queue_def) do |sender|
        @reply_queues[reply_queue_name] = sender
        blk.call(sender)
      end
    end
  end
end

#status(&blk) ⇒ Object



178
179
180
181
182
183
184
# File 'lib/smith/messaging/receiver.rb', line 178

def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end

#subscribe(handler = nil, &blk) ⇒ Object

Subscribes to a queue and passes the headers and payload into the block. subscribe will automatically acknowledge the message unless the options sets :ack to false.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/smith/messaging/receiver.rb', line 84

def subscribe(handler=nil, &blk)

  blk = handler || blk

  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      if !queue.subscribed?
        opts = @options.subscribe
        logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" }
        queue.subscribe(opts) do |,payload|
          if payload
            on_message(, payload, requeue_options, &blk)
          else
            logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" }
          end
        end
      else
        logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" }
      end
    end
  end
end

#unsubscribe(&blk) ⇒ Object



107
108
109
110
111
# File 'lib/smith/messaging/receiver.rb', line 107

def unsubscribe(&blk)
  @queue_completion.completion do |queue|
    queue.unsubscribe(&blk)
  end
end