Class: Smith::Messaging::Receiver

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/smith/messaging/receiver.rb

Instance Method Summary collapse

Methods included from Util

#number_of_consumers, #number_of_messages

Methods included from Logger

included

Constructor Details

#initialize(queue_def, opts = {}, &blk) ⇒ Receiver

Returns a new instance of Receiver.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/smith/messaging/receiver.rb', line 13

def initialize(queue_def, opts={}, &blk)

  # This is for backward compatibility.
  @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts)

  @acl_type_cache = AclTypeCache.instance

  @foo_options = {
    :error_queue => opts.delete(:error_queue) { false },
    :auto_ack => option_or_default(@queue_def.options, :auto_ack, true),
    :threading => option_or_default(@queue_def.options, :threading, false)}

  fanout_options = if opts.delete(:fanout)
    {:persistence => opts.delete(:fanout_persistence) { true }, :queue_suffix => opts.delete(:fanout_queue_suffix)}
  end

  @payload_type = Array(option_or_default(@queue_def.options, :type, []))

  prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch)

  @options = AmqpOptions.new(@queue_def.options)
  @options.routing_key = @queue_def.normalise

  @message_counter = MessageCounter.new(@queue_def.denormalise)

  @channel_completion = EM::Completion.new
  @queue_completion = EM::Completion.new
  @exchange_completion = EM::Completion.new
  @requeue_options_completion = EM::Completion.new

  @reply_queues = {}

  open_channel(:prefetch => prefetch) do |channel|
    @channel_completion.succeed(channel)
    exchange_type = (fanout_options) ? :fanout : :direct
    channel.send(exchange_type, @queue_def.normalise, @options.exchange) do |exchange|
      @exchange_completion.succeed(exchange)
    end
  end

  open_channel(:prefetch => prefetch) do |channel|
    channel.queue(*fanout_queue_and_opts(fanout_options)) do |queue|
      @exchange_completion.completion do |exchange|
        queue.bind(exchange, :routing_key => @queue_def.normalise)
        @queue_completion.succeed(queue)
        @requeue_options_completion.succeed(:exchange => exchange, :queue => queue)
      end
    end
  end

  blk.call(self) if blk
end

Instance Method Details

#ack(multiple = false) ⇒ Object



66
67
68
# File 'lib/smith/messaging/receiver.rb', line 66

def ack(multiple=false)
  @channel_completion.completion {|channel| channel.ack(multiple) }
end

#delete(&blk) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/smith/messaging/receiver.rb', line 168

def delete(&blk)
  @exchange_completion.completion do |exchange|
    @queue_completion.completion do |queue|
      @channel_completion.completion do |channel|
        queue.unbind(exchange) do
          queue.delete do
            exchange.delete do
              channel.close(&blk)
            end
          end
        end
      end
    end
  end
end

#on_error(chain = false, &blk) ⇒ Object

Define a channel error handler.



138
139
140
141
142
143
# File 'lib/smith/messaging/receiver.rb', line 138

def on_error(chain=false, &blk)
  # TODO Check that this chains callbacks
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end

#on_requeue(&blk) ⇒ Object



198
199
200
201
202
# File 'lib/smith/messaging/receiver.rb', line 198

def on_requeue(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue => blk)
  end
end

#on_requeue_limit(&blk) ⇒ Object



204
205
206
207
208
# File 'lib/smith/messaging/receiver.rb', line 204

def on_requeue_limit(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue_limit => blk)
  end
end

#pop(&blk) ⇒ Object

pops a message off the queue and passes the headers and payload into the block. pop will automatically acknowledge the message unless the options sets :ack to false.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/smith/messaging/receiver.rb', line 122

def pop(&blk)
  opts = @options.pop
  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      queue.pop(opts) do |, payload|
        if payload
          on_message(, payload, requeue_options, &blk)
        else
          blk.call(nil,nil)
        end
      end
    end
  end
end

#queue_nameObject



164
165
166
# File 'lib/smith/messaging/receiver.rb', line 164

def queue_name
  @queue_def.denormalise
end

#requeue_parameters(opts = {}) ⇒ Object



192
193
194
195
196
# File 'lib/smith/messaging/receiver.rb', line 192

def requeue_parameters(opts={})
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(opts)
  end
end

#setup_reply_queue(reply_queue_name, &blk) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/smith/messaging/receiver.rb', line 70

def setup_reply_queue(reply_queue_name, &blk)
  if @reply_queues[reply_queue_name]
    blk.call(@reply_queues[reply_queue_name])
  else
    @exchange_completion.completion do |exchange|
      logger.debug { "Attaching to reply queue: #{reply_queue_name}" }

      reply_queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false)

      Smith::Messaging::Sender.new(reply_queue_def) do |sender|
        @reply_queues[reply_queue_name] = sender
        blk.call(sender)
      end
    end
  end
end

#status(&blk) ⇒ Object



184
185
186
187
188
189
190
# File 'lib/smith/messaging/receiver.rb', line 184

def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end

#subscribe(handler = nil, &blk) ⇒ Object

Subscribes to a queue and passes the headers and payload into the block. subscribe will automatically acknowledge the message unless the options sets :ack to false.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/smith/messaging/receiver.rb', line 90

def subscribe(handler=nil, &blk)

  blk = handler || blk

  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      if !queue.subscribed?
        opts = @options.subscribe
        logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" }
        queue.subscribe(opts) do |, payload|
          if payload
            on_message(, payload, requeue_options, &blk)
          else
            logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" }
          end
        end
      else
        logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" }
      end
    end
  end
end

#unsubscribe(&blk) ⇒ Object



113
114
115
116
117
# File 'lib/smith/messaging/receiver.rb', line 113

def unsubscribe(&blk)
  @queue_completion.completion do |queue|
    queue.unsubscribe(&blk)
  end
end