Class: MessageQueue::Adapters::Memory::Connection::Consumer
- Defined in:
- lib/message_queue/adapters/memory/consumer.rb
Instance Attribute Summary collapse
-
#block ⇒ Object
readonly
Returns the value of attribute block.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Attributes inherited from Consumer
Instance Method Summary collapse
-
#initialize(*args) ⇒ Consumer
constructor
A new instance of Consumer.
- #subscribe(options = {}, &block) ⇒ Object
- #unsubscribe(options = {}) ⇒ Object
- #update(object, options) ⇒ Object
Methods inherited from Consumer
Methods included from OptionsHelper
Constructor Details
#initialize(*args) ⇒ Consumer
Returns a new instance of Consumer.
4 5 6 7 |
# File 'lib/message_queue/adapters/memory/consumer.rb', line 4 def initialize(*args) super @queue = [] end |
Instance Attribute Details
#block ⇒ Object (readonly)
Returns the value of attribute block.
2 3 4 |
# File 'lib/message_queue/adapters/memory/consumer.rb', line 2 def block @block end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
2 3 4 |
# File 'lib/message_queue/adapters/memory/consumer.rb', line 2 def queue @queue end |
Instance Method Details
#subscribe(options = {}, &block) ⇒ Object
9 10 11 12 13 |
# File 'lib/message_queue/adapters/memory/consumer.rb', line 9 def subscribe( = {}, &block) producer = .fetch(:producer) producer.add_observer(self) @block = block end |
#unsubscribe(options = {}) ⇒ Object
15 16 17 18 19 |
# File 'lib/message_queue/adapters/memory/consumer.rb', line 15 def unsubscribe( = {}) producer = .fetch(:producer) producer.delete_observer(self) @block = nil end |
#update(object, options) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/message_queue/adapters/memory/consumer.rb', line 21 def update(object, ) object = load_object(object) = MessageQueue::Message.new(:message_id => [:message_id], :type => [:type], :timestamp => [:timestamp], :routing_key => [:routing_key], :payload => load_object(object)) if block block.call() else queue << end end |