Class: Synapse::Partitioning::MemoryQueueReader

Inherits:
QueueReader
  • Object
show all
Defined in:
lib/synapse/partitioning/memory_queue_reader.rb

Overview

Queue reader that dequeues messages from an in-memory Ruby queue

Instance Method Summary collapse

Methods inherited from QueueReader

#ack_message

Constructor Details

#initialize(queue, name) ⇒ undefined

Parameters:

  • queue (Queue)
  • name (String)

    Name of the queue being read from



8
9
10
11
# File 'lib/synapse/partitioning/memory_queue_reader.rb', line 8

def initialize(queue, name)
  @queue = queue
  @name = name
end

Instance Method Details

#nack_message(receipt) ⇒ undefined

Parameters:

Returns:

  • (undefined)


26
27
28
# File 'lib/synapse/partitioning/memory_queue_reader.rb', line 26

def nack_message(receipt)
  @queue.push receipt.packed
end

#subscribe {|MessageReceipt| ... } ⇒ undefined

Yields:

Returns:

  • (undefined)


15
16
17
18
19
20
21
22
# File 'lib/synapse/partitioning/memory_queue_reader.rb', line 15

def subscribe(&handler)
  loop do
    packed = @queue.pop

    receipt = MessageReceipt.new 0, packed, @name
    handler.call receipt
  end
end