Class: NSQ::Reader

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/nsq/reader.rb

Overview

Maintains a collection of subscribers to topics and channels.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#logger, #logger=

Methods included from Util

#assert_topic_and_channel_valid, #valid_channel_name?, #valid_topic_name?

Constructor Details

#initialize(options = {}) ⇒ Reader

Create a new NSQ Reader

Options (Refer to NSQ::Subscriber::new for additional options which will be passed on to each subscriber):

:nsqd_tcp_addresses [String or Array of Strings]
  Array of nsqd servers to connect to with port numbers
  ['server1:4150', 'server2:4150']

:logger [Logger]
  The Logger class

:logger_level [Symbol]
  The Logger Level [:info, :debug, :warn, :error]


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/nsq/reader.rb', line 26

def initialize(options={})
  @options            = options
  @nsqd_tcp_addresses = Array(options[:nsqd_tcp_addresses])

  logger          = options[:logger]       if options[:logger]
  logger.level    = options[:logger_level] if options[:logger_level]

  @selector    = ::NIO::Selector.new
  @timer       = Timer.new(@selector)
  @subscribers = ThreadSafe::Cache.new
  @stop        = Atomic.new(false)
  @running     = Atomic.new(false)

  raise 'Must pass either option :nsqd_tcp_addresses' if @nsqd_tcp_addresses.empty?
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



11
12
13
# File 'lib/nsq/reader.rb', line 11

def options
  @options
end

#selectorObject (readonly)

Returns the value of attribute selector.



11
12
13
# File 'lib/nsq/reader.rb', line 11

def selector
  @selector
end

Instance Method Details

#add_timeout(interval, &block) ⇒ Object

Call the given block from within the #run thread when the given interval has passed.



108
109
110
# File 'lib/nsq/reader.rb', line 108

def add_timeout(interval, &block)
  @timer.add(interval, &block)
end

#runObject

Processes all the messages from the subscribed connections. This will not return until #stop has been called in a separate thread.



87
88
89
90
91
92
93
94
# File 'lib/nsq/reader.rb', line 87

def run
  @running.value = true # we can't run from multiple threads so this is fine
  until stopped?
    @selector.select(@timer.next_interval) { |m| m.value.call }
  end
ensure
  @running.value = false
end

#running?Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/nsq/reader.rb', line 81

def running?
  @running.value
end

#stopObject

Stop this reader which will gracefully exit the run method after all current messages are processed.



97
98
99
100
101
102
103
104
105
# File 'lib/nsq/reader.rb', line 97

def stop
  logger.debug("#{self}: Reader stopping...")
  @stop.try_update { |m| m = true }
  @running.try_update { |m| m = false }
  @selector.wakeup
  @subscribers.each_value(&:stop)
rescue Atomic::ConcurrentUpdateError
  retry
end

#stopped?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/nsq/reader.rb', line 77

def stopped?
  @stop.value
end

#subscribe(topic, channel, options = {}, &block) ⇒ Object

Subscribes to a given topic and channel.

If a block is passed, then within NSQ::Reader#run that block will be run synchronously whenever a message is received for this channel.

If a block is not passed, then the QueueSubscriber that is returned from this method should have it’s QueueSubscriber#run method executed within one or more separate threads for processing the messages.

Refer to Subscriber::new for the options that can be passed to this method.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/nsq/reader.rb', line 52

def subscribe(topic, channel, options={}, &block)
  assert_topic_and_channel_valid(topic, channel)
  subscriber = nil
  name = "#{topic}:#{channel}"

  raise "Already subscribed to #{name}" if @subscribers[name]
  subscriber = @subscribers[name] = Subscriber.new(self, topic, channel, options, &block)

  @nsqd_tcp_addresses.each do |addr|
    address, port = addr.split(':')
    subscriber.add_connection(address, port.to_i)
  end

  subscriber
end

#unsubscribe(topic, channel) ⇒ Object

Unsubscribe a given topic and channel.



69
70
71
72
73
74
75
# File 'lib/nsq/reader.rb', line 69

def unsubscribe(topic, channel)
  name = "#{topic}:#{channel}"
  subscriber = @subscribers[name]
  return unless subscriber
  subscriber.stop
  @subscribers.delete(name)
end