Class: AMQP::Hermes::Receiver

Inherits:
Object
  • Object
show all
Includes:
Connectivity
Defined in:
lib/amqp-hermes/receiver.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Connectivity

#channel, #close, #connection, #open?, #open_channel, #open_connection

Constructor Details

#initialize(queue, topic = nil, options = {}) ⇒ Receiver

Returns a new instance of Receiver.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/amqp-hermes/receiver.rb', line 9

def initialize(queue, topic=nil, options={})
  raise "You *MUST* specify a queue" if queue.nil? or queue.empty?
  @queue = queue

  if topic.is_a? Hash
    options = topic.dup
    topic = options.delete(:topic)
  end

  @routing_key = options.delete(:routing_key)
  @routing_key ||= "#{queue}.*"

  if @routing_key !~ Regexp.new(queue)
    @routing_key = "#{queue}.routing_key"
  end

  @handler = options.delete(:handler) || self

  options[:auto_delete] ||= true

  topic ||= "pub/sub"
  @exchange = channel.topic(topic, options)

  @messages = []
  @_listening = false

  self.open_connection
  self.listen
end

Instance Attribute Details

#_listeningObject

Returns the value of attribute _listening.



7
8
9
# File 'lib/amqp-hermes/receiver.rb', line 7

def _listening
  @_listening
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



6
7
8
# File 'lib/amqp-hermes/receiver.rb', line 6

def exchange
  @exchange
end

#messagesObject (readonly)

Returns the value of attribute messages.



6
7
8
# File 'lib/amqp-hermes/receiver.rb', line 6

def messages
  @messages
end

#queueObject (readonly)

Returns the value of attribute queue.



6
7
8
# File 'lib/amqp-hermes/receiver.rb', line 6

def queue
  @queue
end

#routing_keyObject (readonly)

Returns the value of attribute routing_key.



6
7
8
# File 'lib/amqp-hermes/receiver.rb', line 6

def routing_key
  @routing_key
end

Instance Method Details

#clearObject



62
63
64
# File 'lib/amqp-hermes/receiver.rb', line 62

def clear
  @messages = []
end

#inspectObject



66
67
68
# File 'lib/amqp-hermes/receiver.rb', line 66

def inspect
  %Q{#<Hermes::Receiver @queue="#{@queue}" @routing_key="#{@routing_key}" @exchange="#{@exchange}" open=#{self.open?} listening=#{self.listening?}>}
end

#listenObject



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/amqp-hermes/receiver.rb', line 39

def listen
  Thread.new(self, @handler) do |receiver, handler|
    receiver._listening = true

    receiver.channel.queue(receiver.queue).bind(
      receiver.exchange, :routing_key => receiver.routing_key
    ).subscribe(:ack => true) do |headers, payload|
      handler.receive(AMQP::Hermes::Message.new(headers, payload))
      headers.ack
    end
  end
end

#listening?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/amqp-hermes/receiver.rb', line 52

def listening?
  @_listening == true ? true : false
end

#receive(message) ⇒ Object

implement the handler interface



57
58
59
60
# File 'lib/amqp-hermes/receiver.rb', line 57

def receive(message)
  return nil if !message.kind_of?(AMQP::Hermes::Message)
  @messages << message
end