Class: NSQ::QueueSubscriber

Inherits:
Subscriber show all
Defined in:
lib/nsq/queue_subscriber.rb

Overview

An asynchronous subscriber that can be run on multiple threads for reading messages from a subscribed channel.

Instance Attribute Summary

Attributes inherited from Subscriber

#max_in_flight, #name, #selector

Instance Method Summary collapse

Methods inherited from Subscriber

#add_connection, #connection_count, #create_connection_backoff_timer, #create_ready_backoff_timer, #handle_connection, #handle_frame_error, #handle_heartbeat, #handle_io_error, #process_message, #ready_threshold, #remove_connection, #stopped?, #to_s

Constructor Details

#initialize(reader, topic, channel, options) ⇒ QueueSubscriber

:nodoc:



6
7
8
9
10
11
# File 'lib/nsq/queue_subscriber.rb', line 6

def initialize(reader, topic, channel, options) #:nodoc:
  super
  @queue     = Queue.new
  @run_mutex = Mutex.new
  @run_count = 0
end

Instance Method Details

#handle_message(connection, message) ⇒ Object

:nodoc:



18
19
20
# File 'lib/nsq/queue_subscriber.rb', line 18

def handle_message(connection, message) #:nodoc:
  @queue << [connection, message]
end

#ready_countObject

:nodoc:



13
14
15
16
# File 'lib/nsq/queue_subscriber.rb', line 13

def ready_count #:nodoc:
  # Return the minimum of Subscriber#ready_count and the amount of space left in the queue
  [super, self.max_in_flight - @queue.size].min
end

#run(&block) ⇒ Object

Processes messages from the subscribed connections. This will not return until #stop has been called in a separate thread. This can be called from multiple threads if you want multiple workers handling the incoming messages.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/nsq/queue_subscriber.rb', line 25

def run(&block)
  @run_mutex.synchronize { @run_count += 1}
  until @stopped
    pair = @queue.pop
    if pair == :stop
      # Give the next thread something to pop
      @queue << :stop
      return
    end
    connection, message = pair
    process_message(connection, message, &block)
  end
ensure
  @run_mutex.synchronize { @run_count -= 1}
end

#stopObject

Stop this subscriber once all the queued messages have been handled.



42
43
44
45
46
47
48
49
# File 'lib/nsq/queue_subscriber.rb', line 42

def stop
  @stopped = true
  # Give the threads something to pop
  @queue << :stop
  # TODO: Put a max time on this so we don't potentially hang
  sleep 1 while @run_count > 0
  super
end