Class: NSQ::Reader
Overview
Maintains a collection of subscribers to topics and channels.
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#selector ⇒ Object
readonly
Returns the value of attribute selector.
Instance Method Summary collapse
-
#add_timeout(interval, &block) ⇒ Object
Call the given block from within the #run thread when the given interval has passed.
-
#initialize(options = {}) ⇒ Reader
constructor
Create a new NSQ Reader.
-
#run ⇒ Object
Processes all the messages from the subscribed connections.
- #running? ⇒ Boolean
-
#stop ⇒ Object
Stop this reader which will gracefully exit the run method after all current messages are processed.
- #stopped? ⇒ Boolean
-
#subscribe(topic, channel, options = {}, &block) ⇒ Object
Subscribes to a given topic and channel.
-
#unsubscribe(topic, channel) ⇒ Object
Unsubscribe a given topic and channel.
Methods included from Logger
Methods included from Util
#assert_topic_and_channel_valid, #valid_channel_name?, #valid_topic_name?
Constructor Details
#initialize(options = {}) ⇒ Reader
Create a new NSQ Reader
Options (Refer to NSQ::Subscriber::new for additional options which will be passed on to each subscriber):
:nsqd_tcp_addresses [String or Array of Strings]
Array of nsqd servers to connect to with port numbers
['server1:4150', 'server2:4150']
:logger [Logger]
The Logger class
:logger_level [Symbol]
The Logger Level [:info, :debug, :warn, :error]
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/nsq/reader.rb', line 26 def initialize(={}) @options = @nsqd_tcp_addresses = Array([:nsqd_tcp_addresses]) logger = [:logger] if [:logger] logger.level = [:logger_level] if [:logger_level] @selector = ::NIO::Selector.new @timer = Timer.new(@selector) @subscribers = ThreadSafe::Cache.new @stop = Atomic.new(false) @running = Atomic.new(false) raise 'Must pass either option :nsqd_tcp_addresses' if @nsqd_tcp_addresses.empty? end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
11 12 13 |
# File 'lib/nsq/reader.rb', line 11 def @options end |
#selector ⇒ Object (readonly)
Returns the value of attribute selector.
11 12 13 |
# File 'lib/nsq/reader.rb', line 11 def selector @selector end |
Instance Method Details
#add_timeout(interval, &block) ⇒ Object
Call the given block from within the #run thread when the given interval has passed.
108 109 110 |
# File 'lib/nsq/reader.rb', line 108 def add_timeout(interval, &block) @timer.add(interval, &block) end |
#run ⇒ Object
Processes all the messages from the subscribed connections. This will not return until #stop has been called in a separate thread.
87 88 89 90 91 92 93 94 |
# File 'lib/nsq/reader.rb', line 87 def run @running.value = true # we can't run from multiple threads so this is fine until stopped? @selector.select(@timer.next_interval) { |m| m.value.call } end ensure @running.value = false end |
#running? ⇒ Boolean
81 82 83 |
# File 'lib/nsq/reader.rb', line 81 def running? @running.value end |
#stop ⇒ Object
Stop this reader which will gracefully exit the run method after all current messages are processed.
97 98 99 100 101 102 103 104 105 |
# File 'lib/nsq/reader.rb', line 97 def stop logger.debug("#{self}: Reader stopping...") @stop.try_update { |m| m = true } @running.try_update { |m| m = false } @selector.wakeup @subscribers.each_value(&:stop) rescue Atomic::ConcurrentUpdateError retry end |
#stopped? ⇒ Boolean
77 78 79 |
# File 'lib/nsq/reader.rb', line 77 def stopped? @stop.value end |
#subscribe(topic, channel, options = {}, &block) ⇒ Object
Subscribes to a given topic and channel.
If a block is passed, then within NSQ::Reader#run that block will be run synchronously whenever a message is received for this channel.
If a block is not passed, then the QueueSubscriber that is returned from this method should have it’s QueueSubscriber#run method executed within one or more separate threads for processing the messages.
Refer to Subscriber::new for the options that can be passed to this method.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/nsq/reader.rb', line 52 def subscribe(topic, channel, ={}, &block) assert_topic_and_channel_valid(topic, channel) subscriber = nil name = "#{topic}:#{channel}" raise "Already subscribed to #{name}" if @subscribers[name] subscriber = @subscribers[name] = Subscriber.new(self, topic, channel, , &block) @nsqd_tcp_addresses.each do |addr| address, port = addr.split(':') subscriber.add_connection(address, port.to_i) end subscriber end |
#unsubscribe(topic, channel) ⇒ Object
Unsubscribe a given topic and channel.
69 70 71 72 73 74 75 |
# File 'lib/nsq/reader.rb', line 69 def unsubscribe(topic, channel) name = "#{topic}:#{channel}" subscriber = @subscribers[name] return unless subscriber subscriber.stop @subscribers.delete(name) end |