Class: NSQ::Subscriber
- Inherits:
-
Object
- Object
- NSQ::Subscriber
- Defined in:
- lib/nsq/subscriber.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#selector ⇒ Object
readonly
Returns the value of attribute selector.
Instance Method Summary collapse
-
#add_connection(host, port) ⇒ Object
:nodoc:.
-
#connection_count ⇒ Object
:nodoc:.
-
#create_connection_backoff_timer ⇒ Object
:nodoc:.
-
#create_ready_backoff_timer ⇒ Object
:nodoc:.
-
#handle_connection(connection) ⇒ Object
:nodoc:.
-
#handle_frame_error(connection, error_message) ⇒ Object
:nodoc:.
-
#handle_heartbeat(connection) ⇒ Object
:nodoc:.
-
#handle_io_error(connection, exception) ⇒ Object
:nodoc:.
-
#handle_message(connection, message) ⇒ Object
:nodoc:.
-
#initialize(reader, topic, channel, options, &block) ⇒ Subscriber
constructor
Creates a new subscriber which maintain connections to all the nsqd instances which publish the given topic.
-
#process_message(connection, message, &block) ⇒ Object
:nodoc:.
-
#ready_count ⇒ Object
The actual value for the READY message.
-
#ready_threshold ⇒ Object
Threshold for a connection where it’s time to send a new READY message.
-
#remove_connection(host, port) ⇒ Object
:nodoc:.
-
#stop ⇒ Object
Stop this subscriber.
-
#stopped? ⇒ Boolean
Return true if this subscriber has been stopped.
-
#to_s ⇒ Object
:nodoc:.
Constructor Details
#initialize(reader, topic, channel, options, &block) ⇒ Subscriber
Creates a new subscriber which maintain connections to all the nsqd instances which publish the given topic. This is never called directly but instead called when Reader#subscribe is called.
Options:
:max_tries [Integer]
The max number of attempts to process a given message at which point it will no longer be requeued.
Defaults to nil which means it will be requeued forever if it continues to fail.
:max_in_flight [Integer]
The number used to determine the RDY count sent for each connection.
Defaults to 1
:requeue_delay (msec) [Integer]
The delay that is sent along with the requeue when a message fails.
Defaults to 90,000 msec
:ready_backoff_timer [Hash of BackoffTimer options]
Options passed to a BackoffTimer for increasing the interval between ready counts when
messages are failing.
Options:
:min_interval (seconds) [Float]
The minimum interval that the BackoffTimer will return.
Defaults to 0
:max_interval (seconds) [Float]
The maximum interval that the BackoffTimer will return.
Defaults to 120
:ratio [Float]
Defaults to 0.25
:short_length [Float]
Defaults to 10
:long_length [Float]
Defaults to 250
:connection_backoff_timer [Hash of BackoffTimer options]
Options passed to a BackoffTimer for increasing the interval between connection attempts
when a connection to nsqd is failing.
Options (Refer to :ready_backoff_timer above for the meaning of these options):
:min_interval (seconds) [Float]
Defaults to 0
:max_interval (seconds) [Float]
Defaults to 30
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/nsq/subscriber.rb', line 53 def initialize(reader, topic, channel, , &block) = reader..merge() @name = "#{reader.name}:#{topic}:#{channel}" @reader = reader @selector = reader.selector @topic = topic @channel = channel @block = block @max_tries = [:max_tries] @max_in_flight = ([:max_in_flight] || 1).to_i @requeue_delay = ([:requeue_delay] || 90).to_i * 1000 @connection_hash = {} = [:ready_backoff_timer] || {} = [:connection_backoff_timer] || {} @ready_min_interval = [:min_interval] || 0 @ready_max_interval = [:max_interval] || 120 @ready_ratio = [:ratio] || 0.25 @ready_short_length = [:short_length] || 10 @ready_long_length = [:long_length] || 250 @connection_min_interval = [:min_interval] || 0 @connection_max_interval = [:max_interval] || 30 @connection_ratio = [:ratio] || 0.25 @connection_short_length = [:short_length] || 10 @connection_long_length = [:long_length] || 250 raise "Invalid value for max_in_flight, must be between 0 and 2500: #{@max_in_flight}" unless @max_in_flight.between?(1,2499) end |
Instance Attribute Details
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
4 5 6 |
# File 'lib/nsq/subscriber.rb', line 4 def max_in_flight @max_in_flight end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
3 4 5 |
# File 'lib/nsq/subscriber.rb', line 3 def name @name end |
#selector ⇒ Object (readonly)
Returns the value of attribute selector.
3 4 5 |
# File 'lib/nsq/subscriber.rb', line 3 def selector @selector end |
Instance Method Details
#add_connection(host, port) ⇒ Object
:nodoc:
108 109 110 |
# File 'lib/nsq/subscriber.rb', line 108 def add_connection(host, port) #:nodoc: @connection_hash[[host, port]] = Connection.new(@reader, self, host, port) end |
#connection_count ⇒ Object
:nodoc:
104 105 106 |
# File 'lib/nsq/subscriber.rb', line 104 def connection_count #:nodoc: @connection_hash.size end |
#create_connection_backoff_timer ⇒ Object
:nodoc:
88 89 90 |
# File 'lib/nsq/subscriber.rb', line 88 def create_connection_backoff_timer #:nodoc: BackoffTimer.new(@connection_min_interval, @connection_max_interval, @connection_ratio, @connection_short_length, @connection_long_length) end |
#create_ready_backoff_timer ⇒ Object
:nodoc:
84 85 86 |
# File 'lib/nsq/subscriber.rb', line 84 def create_ready_backoff_timer #:nodoc: BackoffTimer.new(@ready_min_interval, @ready_max_interval, @ready_ratio, @ready_short_length, @ready_long_length) end |
#handle_connection(connection) ⇒ Object
:nodoc:
132 133 134 |
# File 'lib/nsq/subscriber.rb', line 132 def handle_connection(connection) #:nodoc: connection.send_init(@topic, @channel, @reader.short_id, @reader.long_id) end |
#handle_frame_error(connection, error_message) ⇒ Object
:nodoc:
156 157 158 159 |
# File 'lib/nsq/subscriber.rb', line 156 def handle_frame_error(connection, ) #:nodoc: NSQ.logger.error("Received error from nsqd: #{.inspect}") connection.reset end |
#handle_heartbeat(connection) ⇒ Object
:nodoc:
136 137 |
# File 'lib/nsq/subscriber.rb', line 136 def handle_heartbeat(connection) #:nodoc: end |
#handle_io_error(connection, exception) ⇒ Object
:nodoc:
161 162 163 164 |
# File 'lib/nsq/subscriber.rb', line 161 def handle_io_error(connection, exception) #:nodoc: NSQ.logger.error("Socket error: #{exception.}\n\t#{exception.backtrace[0,2].join("\n\t")}") connection.reset end |
#handle_message(connection, message) ⇒ Object
:nodoc:
139 140 141 |
# File 'lib/nsq/subscriber.rb', line 139 def (connection, ) #:nodoc: (connection, , &@block) end |
#process_message(connection, message, &block) ⇒ Object
:nodoc:
143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/nsq/subscriber.rb', line 143 def (connection, , &block) #:nodoc: yield connection.send_finish(.id, true) rescue Exception => e NSQ.logger.error("#{connection.name}: Exception during handle_message: #{e.}\n\t#{e.backtrace.join("\n\t")}") if @max_tries && attempts >= @max_tries NSQ.logger.warning("#{connection.name}: Giving up on message after #{@max_tries} tries: #{body.inspect}") connection.send_finish(.id, false) else connection.send_requeue(.id, attempts * @requeue_delay) end end |
#ready_count ⇒ Object
The actual value for the READY message
98 99 100 101 102 |
# File 'lib/nsq/subscriber.rb', line 98 def ready_count #:nodoc: # TODO: Should we take into account the last_ready_count minus the number of messages sent since then? # Rounding up! (@max_in_flight + @connection_hash.size - 1) / @connection_hash.size end |
#ready_threshold ⇒ Object
Threshold for a connection where it’s time to send a new READY message
93 94 95 |
# File 'lib/nsq/subscriber.rb', line 93 def ready_threshold #:nodoc: @max_in_flight / @connection_hash.size / 4 end |
#remove_connection(host, port) ⇒ Object
:nodoc:
112 113 114 115 116 |
# File 'lib/nsq/subscriber.rb', line 112 def remove_connection(host, port) #:nodoc: connection = @connection_hash.delete([host, port]) return unless connection connection.close end |
#stop ⇒ Object
Stop this subscriber
119 120 121 122 123 124 125 |
# File 'lib/nsq/subscriber.rb', line 119 def stop @stopped = true @connection_hash.each_value do |connection| connection.close end @connection_hash.clear end |
#stopped? ⇒ Boolean
Return true if this subscriber has been stopped
128 129 130 |
# File 'lib/nsq/subscriber.rb', line 128 def stopped? @stopped end |
#to_s ⇒ Object
:nodoc:
166 167 168 |
# File 'lib/nsq/subscriber.rb', line 166 def to_s #:nodoc: @name end |