Class: NSQ::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/nsq/subscriber.rb

Direct Known Subclasses

QueueSubscriber

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reader, topic, channel, options, &block) ⇒ Subscriber

Creates a new subscriber which maintain connections to all the nsqd instances which publish the given topic. This is never called directly but instead called when Reader#subscribe is called.

Options:

:max_tries [Integer]
  The max number of attempts to process a given message at which point it will no longer be requeued.
  Defaults to nil which means it will be requeued forever if it continues to fail.

:max_in_flight [Integer]
  The number used to determine the RDY count sent for each connection.
  Defaults to 1

:requeue_delay (msec) [Integer]
  The delay that is sent along with the requeue when a message fails.
  Defaults to 90,000 msec

:ready_backoff_timer [Hash of BackoffTimer options]
  Options passed to a BackoffTimer for increasing the interval between ready counts when
  messages are failing.
    Options:
      :min_interval (seconds) [Float]
        The minimum interval that the BackoffTimer will return.
        Defaults to 0

      :max_interval (seconds) [Float]
        The maximum interval that the BackoffTimer will return.
        Defaults to 120

      :ratio [Float]
        Defaults to 0.25

      :short_length [Float]
        Defaults to 10

      :long_length [Float]
        Defaults to 250

:connection_backoff_timer [Hash of BackoffTimer options]
  Options passed to a BackoffTimer for increasing the interval between connection attempts
  when a connection to nsqd is failing.
    Options (Refer to :ready_backoff_timer above for the meaning of these options):
      :min_interval (seconds) [Float]
        Defaults to 0

      :max_interval (seconds) [Float]
        Defaults to 30


53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/nsq/subscriber.rb', line 53

def initialize(reader, topic, channel, options, &block)
  options          = reader.options.merge(options)
  @name            = "#{reader.name}:#{topic}:#{channel}"
  @reader          = reader
  @selector        = reader.selector
  @topic           = topic
  @channel         = channel
  @block           = block
  @max_tries       = options[:max_tries]
  @max_in_flight   = (options[:max_in_flight] || 1).to_i
  @requeue_delay   = (options[:requeue_delay] || 90).to_i * 1000
  @connection_hash = {}

  ready_options      = options[:ready_backoff_timer]           || {}
  connection_options = options[:connection_backoff_timer]      || {}

  @ready_min_interval      = ready_options[:min_interval]      || 0
  @ready_max_interval      = ready_options[:max_interval]      || 120
  @ready_ratio             = ready_options[:ratio]             || 0.25
  @ready_short_length      = ready_options[:short_length]      || 10
  @ready_long_length       = ready_options[:long_length]       || 250

  @connection_min_interval = connection_options[:min_interval] || 0
  @connection_max_interval = connection_options[:max_interval] || 30
  @connection_ratio        = connection_options[:ratio]        || 0.25
  @connection_short_length = connection_options[:short_length] || 10
  @connection_long_length  = connection_options[:long_length]  || 250

  raise "Invalid value for max_in_flight, must be between 0 and 2500: #{@max_in_flight}" unless @max_in_flight.between?(1,2499)
end

Instance Attribute Details

#max_in_flightObject

Returns the value of attribute max_in_flight.



4
5
6
# File 'lib/nsq/subscriber.rb', line 4

def max_in_flight
  @max_in_flight
end

#nameObject (readonly)

Returns the value of attribute name.



3
4
5
# File 'lib/nsq/subscriber.rb', line 3

def name
  @name
end

#selectorObject (readonly)

Returns the value of attribute selector.



3
4
5
# File 'lib/nsq/subscriber.rb', line 3

def selector
  @selector
end

Instance Method Details

#add_connection(host, port) ⇒ Object

:nodoc:



108
109
110
# File 'lib/nsq/subscriber.rb', line 108

def add_connection(host, port) #:nodoc:
  @connection_hash[[host, port]] = Connection.new(@reader, self, host, port)
end

#connection_countObject

:nodoc:



104
105
106
# File 'lib/nsq/subscriber.rb', line 104

def connection_count #:nodoc:
  @connection_hash.size
end

#create_connection_backoff_timerObject

:nodoc:



88
89
90
# File 'lib/nsq/subscriber.rb', line 88

def create_connection_backoff_timer #:nodoc:
  BackoffTimer.new(@connection_min_interval, @connection_max_interval, @connection_ratio, @connection_short_length, @connection_long_length)
end

#create_ready_backoff_timerObject

:nodoc:



84
85
86
# File 'lib/nsq/subscriber.rb', line 84

def create_ready_backoff_timer #:nodoc:
  BackoffTimer.new(@ready_min_interval, @ready_max_interval, @ready_ratio, @ready_short_length, @ready_long_length)
end

#handle_connection(connection) ⇒ Object

:nodoc:



132
133
134
# File 'lib/nsq/subscriber.rb', line 132

def handle_connection(connection) #:nodoc:
  connection.send_init(@topic, @channel, @reader.short_id, @reader.long_id)
end

#handle_frame_error(connection, error_message) ⇒ Object

:nodoc:



156
157
158
159
# File 'lib/nsq/subscriber.rb', line 156

def handle_frame_error(connection, error_message) #:nodoc:
  NSQ.logger.error("Received error from nsqd: #{error_message.inspect}")
  connection.reset
end

#handle_heartbeat(connection) ⇒ Object

:nodoc:



136
137
# File 'lib/nsq/subscriber.rb', line 136

def handle_heartbeat(connection) #:nodoc:
end

#handle_io_error(connection, exception) ⇒ Object

:nodoc:



161
162
163
164
# File 'lib/nsq/subscriber.rb', line 161

def handle_io_error(connection, exception) #:nodoc:
  NSQ.logger.error("Socket error: #{exception.message}\n\t#{exception.backtrace[0,2].join("\n\t")}")
  connection.reset
end

#handle_message(connection, message) ⇒ Object

:nodoc:



139
140
141
# File 'lib/nsq/subscriber.rb', line 139

def handle_message(connection, message) #:nodoc:
  process_message(connection, message, &@block)
end

#process_message(connection, message, &block) ⇒ Object

:nodoc:



143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/nsq/subscriber.rb', line 143

def process_message(connection, message, &block) #:nodoc:
  yield message
  connection.send_finish(message.id, true)
rescue Exception => e
  NSQ.logger.error("#{connection.name}: Exception during handle_message: #{e.message}\n\t#{e.backtrace.join("\n\t")}")
  if @max_tries && attempts >= @max_tries
    NSQ.logger.warning("#{connection.name}: Giving up on message after #{@max_tries} tries: #{body.inspect}")
    connection.send_finish(message.id, false)
  else
    connection.send_requeue(message.id, attempts * @requeue_delay)
  end
end

#ready_countObject

The actual value for the READY message



98
99
100
101
102
# File 'lib/nsq/subscriber.rb', line 98

def ready_count #:nodoc:
  # TODO: Should we take into account the last_ready_count minus the number of messages sent since then?
  # Rounding up!
  (@max_in_flight + @connection_hash.size - 1) / @connection_hash.size
end

#ready_thresholdObject

Threshold for a connection where it’s time to send a new READY message



93
94
95
# File 'lib/nsq/subscriber.rb', line 93

def ready_threshold #:nodoc:
  @max_in_flight / @connection_hash.size / 4
end

#remove_connection(host, port) ⇒ Object

:nodoc:



112
113
114
115
116
# File 'lib/nsq/subscriber.rb', line 112

def remove_connection(host, port) #:nodoc:
  connection = @connection_hash.delete([host, port])
  return unless connection
  connection.close
end

#stopObject

Stop this subscriber



119
120
121
122
123
124
125
# File 'lib/nsq/subscriber.rb', line 119

def stop
  @stopped = true
  @connection_hash.each_value do |connection|
    connection.close
  end
  @connection_hash.clear
end

#stopped?Boolean

Return true if this subscriber has been stopped

Returns:

  • (Boolean)


128
129
130
# File 'lib/nsq/subscriber.rb', line 128

def stopped?
  @stopped
end

#to_sObject

:nodoc:



166
167
168
# File 'lib/nsq/subscriber.rb', line 166

def to_s #:nodoc:
  @name
end