Class: MessageQueue::Adapters::Memory::Connection::Consumer

Inherits:
Consumer
  • Object
show all
Defined in:
lib/message_queue/adapters/memory/consumer.rb

Instance Attribute Summary collapse

Attributes inherited from Consumer

#connection, #options

Instance Method Summary collapse

Methods inherited from Consumer

#load_object

Methods included from OptionsHelper

#compute_values, #deep_clone

Constructor Details

#initialize(*args) ⇒ Consumer

Returns a new instance of Consumer.



4
5
6
7
# File 'lib/message_queue/adapters/memory/consumer.rb', line 4

def initialize(*args)
  super
  @queue = []
end

Instance Attribute Details

#blockObject (readonly)

Returns the value of attribute block.



2
3
4
# File 'lib/message_queue/adapters/memory/consumer.rb', line 2

def block
  @block
end

#queueObject (readonly)

Returns the value of attribute queue.



2
3
4
# File 'lib/message_queue/adapters/memory/consumer.rb', line 2

def queue
  @queue
end

Instance Method Details

#subscribe(options = {}, &block) ⇒ Object



9
10
11
12
13
# File 'lib/message_queue/adapters/memory/consumer.rb', line 9

def subscribe(options = {}, &block)
  producer = options.fetch(:producer)
  producer.add_observer(self)
  @block = block
end

#unsubscribe(options = {}) ⇒ Object



15
16
17
18
19
# File 'lib/message_queue/adapters/memory/consumer.rb', line 15

def unsubscribe(options = {})
  producer = options.fetch(:producer)
  producer.delete_observer(self)
  @block = nil
end

#update(object, options) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/message_queue/adapters/memory/consumer.rb', line 21

def update(object, options)
  object = load_object(object)
  message = MessageQueue::Message.new(:message_id => options[:message_id],
                                      :type => options[:type],
                                      :timestamp => options[:timestamp],
                                      :routing_key => options[:routing_key],
                                      :payload => load_object(object))
  if block
    block.call(message)
  else
    queue << message
  end
end