Class: NSQ::QueueSubscriber
- Inherits:
-
Subscriber
- Object
- Subscriber
- NSQ::QueueSubscriber
- Defined in:
- lib/nsq/queue_subscriber.rb
Overview
An asynchronous subscriber that can be run on multiple threads for reading messages from a subscribed channel.
Instance Attribute Summary
Attributes inherited from Subscriber
#max_in_flight, #name, #selector
Instance Method Summary collapse
-
#handle_message(connection, message) ⇒ Object
:nodoc:.
-
#initialize(reader, topic, channel, options) ⇒ QueueSubscriber
constructor
:nodoc:.
-
#ready_count ⇒ Object
:nodoc:.
-
#run(&block) ⇒ Object
Processes messages from the subscribed connections.
-
#stop ⇒ Object
Stop this subscriber once all the queued messages have been handled.
Methods inherited from Subscriber
#add_connection, #connection_count, #create_connection_backoff_timer, #create_ready_backoff_timer, #handle_connection, #handle_frame_error, #handle_heartbeat, #handle_io_error, #process_message, #ready_threshold, #remove_connection, #stopped?, #to_s
Constructor Details
#initialize(reader, topic, channel, options) ⇒ QueueSubscriber
:nodoc:
6 7 8 9 10 11 |
# File 'lib/nsq/queue_subscriber.rb', line 6 def initialize(reader, topic, channel, ) #:nodoc: super @queue = Queue.new @run_mutex = Mutex.new @run_count = 0 end |
Instance Method Details
#handle_message(connection, message) ⇒ Object
:nodoc:
18 19 20 |
# File 'lib/nsq/queue_subscriber.rb', line 18 def (connection, ) #:nodoc: @queue << [connection, ] end |
#ready_count ⇒ Object
:nodoc:
13 14 15 16 |
# File 'lib/nsq/queue_subscriber.rb', line 13 def ready_count #:nodoc: # Return the minimum of Subscriber#ready_count and the amount of space left in the queue [super, self.max_in_flight - @queue.size].min end |
#run(&block) ⇒ Object
Processes messages from the subscribed connections. This will not return until #stop has been called in a separate thread. This can be called from multiple threads if you want multiple workers handling the incoming messages.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/nsq/queue_subscriber.rb', line 25 def run(&block) @run_mutex.synchronize { @run_count += 1} until @stopped pair = @queue.pop if pair == :stop # Give the next thread something to pop @queue << :stop return end connection, = pair (connection, , &block) end ensure @run_mutex.synchronize { @run_count -= 1} end |
#stop ⇒ Object
Stop this subscriber once all the queued messages have been handled.
42 43 44 45 46 47 48 49 |
# File 'lib/nsq/queue_subscriber.rb', line 42 def stop @stopped = true # Give the threads something to pop @queue << :stop # TODO: Put a max time on this so we don't potentially hang sleep 1 while @run_count > 0 super end |