Class: Brown::Receiver

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/brown/receiver.rb

Instance Method Summary collapse

Methods included from Util

#number_of_consumers, #number_of_messages

Methods included from Logger

#backtrace, #log_level, #logger

Methods included from AmqpErrors

#error_lookup, #error_message, #errors

Constructor Details

#initialize(queue_def, opts = {}, &blk) ⇒ Receiver

Returns a new instance of Receiver.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/brown/receiver.rb', line 10

def initialize(queue_def, opts={}, &blk)
  @queue_def = queue_def.is_a?(Brown::QueueDefinition) ? queue_def : Brown::QueueDefinition.new(queue_def, opts)

  @acl_type_cache = Brown::ACLLookup

  @options = opts

  @requeue_options = {}
  @requeue_queue   = Brown::Sender.new(@queue_def, opts)

  @payload_type = Array(option_or_default(@queue_def.options, :type, []))

  prefetch = option_or_default(@queue_def.options, :prefetch, 1)

  @channel_completion = EM::Completion.new
  @queue_completion = EM::Completion.new

  open_channel(:prefetch => prefetch) do |channel|
    logger.debug { "channel open for receiver on #{@queue_def.denormalise}" }
    channel.on_error do |ch, close|
      logger.fatal { "Channel error: #{close.inspect}" }
    end

    channel.queue(@queue_def.normalise) do |queue|
      logger.debug { "Registered queue #{@queue_def.denormalise} on channel" }
      @queue_completion.succeed(queue)
    end

    @channel_completion.succeed(channel)
  end

  blk.call(self) if blk
end

Instance Method Details

#ack(multiple = false) ⇒ Object



44
45
46
# File 'lib/brown/receiver.rb', line 44

def ack(multiple=false)
  @channel_completion.completion {|channel| channel.ack(multiple) }
end

#delete(&blk) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/brown/receiver.rb', line 110

def delete(&blk)
  @queue_completion.completion do |queue|
    @channel_completion.completion do |channel|
      queue.unbind(exchange) do
        queue.delete do
          exchange.delete do
            channel.close(&blk)
          end
        end
      end
    end
  end
end

#on_error(chain = false, &blk) ⇒ Object

Define a channel error handler.



99
100
101
102
103
104
# File 'lib/brown/receiver.rb', line 99

def on_error(chain=false, &blk)
  # TODO Check that this chains callbacks
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end

#on_requeue(&blk) ⇒ Object



75
76
77
# File 'lib/brown/receiver.rb', line 75

def on_requeue(&blk)
  @requeue_options[:on_requeue] = blk
end

#on_requeue_limit(&blk) ⇒ Object



79
80
81
# File 'lib/brown/receiver.rb', line 79

def on_requeue_limit(&blk)
  @requeue_options[:on_requeue_limit] = blk
end

#pop(&blk) ⇒ Object

pops a message off the queue and passes the headers and payload into the block. pop will automatically acknowledge the message unless the options sets :ack to false.



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/brown/receiver.rb', line 86

def pop(&blk)
  @queue_completion.completion do |queue|
    queue.pop({}) do |, payload|
      if payload
        on_message(, payload, &blk)
      else
        blk.call(nil,nil)
      end
    end
  end
end

#queue_nameObject



106
107
108
# File 'lib/brown/receiver.rb', line 106

def queue_name
  @queue_def.denormalise
end

#requeue_parameters(opts) ⇒ Object



71
72
73
# File 'lib/brown/receiver.rb', line 71

def requeue_parameters(opts)
  @requeue_options.merge!(opts)
end

#status(&blk) ⇒ Object



124
125
126
127
128
129
130
# File 'lib/brown/receiver.rb', line 124

def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end

#subscribe(opts = {}, &blk) ⇒ Object

Subscribes to a queue and passes the headers and payload into the block. subscribe will automatically acknowledge the message unless the options sets :ack to false.



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/brown/receiver.rb', line 51

def subscribe(opts = {}, &blk)
  @queue_completion.completion do |queue|
    logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{@queue_def.options}" }
    queue.subscribe(opts.merge(:ack => true)) do |,payload|
      logger.debug { "Received a message on #{@queue_def.denormalise}: #{metadata.to_hash.inspect}" }
      if payload
        on_message(, payload, &blk)
      else
        logger.debug { "Received null message on: #{@queue_def.denormalise} [options]:#{@queue_def.options}" }
      end
    end
  end
end

#unsubscribe(&blk) ⇒ Object



65
66
67
68
69
# File 'lib/brown/receiver.rb', line 65

def unsubscribe(&blk)
  @queue_completion.completion do |queue|
    queue.unsubscribe(&blk)
  end
end