Class: NSQ::Subscriber
Instance Attribute Summary collapse
-
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
-
#selector ⇒ Object
readonly
Returns the value of attribute selector.
Instance Method Summary collapse
-
#add_connection(host, port) ⇒ Object
:nodoc:.
-
#connection_count ⇒ Object
:nodoc:.
-
#create_connection_backoff_timer ⇒ Object
:nodoc:.
-
#create_ready_backoff_timer ⇒ Object
:nodoc:.
-
#handle_connection(connection) ⇒ Object
:nodoc:.
-
#handle_frame_error(connection, error_message) ⇒ Object
:nodoc:.
-
#handle_heartbeat(connection) ⇒ Object
:nodoc:.
-
#handle_io_error(connection, exception) ⇒ Object
:nodoc:.
-
#handle_message(connection, message) ⇒ Object
:nodoc:.
-
#initialize(reader, topic, channel, options, &block) ⇒ Subscriber
constructor
Creates a new subscriber which maintain connections to all the nsqd instances which publish the given topic.
-
#process_message(connection, message, &block) ⇒ Object
:nodoc:.
-
#ready_count ⇒ Object
The actual value for the READY message.
-
#ready_threshold ⇒ Object
Threshold for a connection where it’s time to send a new READY message.
-
#remove_connection(host, port) ⇒ Object
:nodoc:.
-
#stop ⇒ Object
Stop this subscriber.
-
#stopped? ⇒ Boolean
Return true if this subscriber has been stopped.
Methods included from Logger
Constructor Details
#initialize(reader, topic, channel, options, &block) ⇒ Subscriber
Creates a new subscriber which maintain connections to all the nsqd instances which publish the given topic. This is never called directly but instead called when Reader#subscribe is called.
Options:
:max_tries [Integer]
The max number of attempts to process a given message at which point it will no longer be requeued.
Defaults to nil which means it will be requeued forever if it continues to fail.
:max_in_flight [Integer]
The number used to determine the RDY count sent for each connection.
Defaults to 1
:requeue_delay (msec) [Integer]
The delay that is sent along with the requeue when a message fails.
Defaults to 90,000 msec
:ready_backoff_timer [Hash of BackoffTimer options]
Options passed to a BackoffTimer for increasing the interval between ready counts when
messages are failing.
Options:
:min_interval (seconds) [Float]
The minimum interval that the BackoffTimer will return.
Defaults to 0
:max_interval (seconds) [Float]
The maximum interval that the BackoffTimer will return.
Defaults to 120
:ratio [Float]
Defaults to 0.25
:short_length [Float]
Defaults to 10
:long_length [Float]
Defaults to 250
:connection_backoff_timer [Hash of BackoffTimer options]
Options passed to a BackoffTimer for increasing the interval between connection attempts
when a connection to nsqd is failing.
Options (Refer to :ready_backoff_timer above for the meaning of these options):
:min_interval (seconds) [Float]
Defaults to 0
:max_interval (seconds) [Float]
Defaults to 30
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/nsq/subscriber.rb', line 55 def initialize(reader, topic, channel, , &block) = reader..merge() @reader = reader @selector = reader.selector @topic = topic @channel = channel @block = block @max_tries = [:max_tries] @max_in_flight = ([:max_in_flight] || 1).to_i @requeue_delay = ([:requeue_delay] || 90).to_i * 1000 @connection_hash = {} = [:ready_backoff_timer] || {} = [:connection_backoff_timer] || {} @ready_min_interval = [:min_interval] || 0 @ready_max_interval = [:max_interval] || 120 @ready_ratio = [:ratio] || 0.25 @ready_short_length = [:short_length] || 10 @ready_long_length = [:long_length] || 250 @connection_min_interval = [:min_interval] || 0 @connection_max_interval = [:max_interval] || 30 @connection_ratio = [:ratio] || 0.25 @connection_short_length = [:short_length] || 10 @connection_long_length = [:long_length] || 250 raise "Invalid value for max_in_flight, must be between 0 and 2500: #{@max_in_flight}" unless @max_in_flight.between?(1,2499) end |
Instance Attribute Details
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
6 7 8 |
# File 'lib/nsq/subscriber.rb', line 6 def max_in_flight @max_in_flight end |
#selector ⇒ Object (readonly)
Returns the value of attribute selector.
5 6 7 |
# File 'lib/nsq/subscriber.rb', line 5 def selector @selector end |
Instance Method Details
#add_connection(host, port) ⇒ Object
:nodoc:
109 110 111 |
# File 'lib/nsq/subscriber.rb', line 109 def add_connection(host, port) #:nodoc: @connection_hash[[host, port]] = Connection.new(@reader, self, host, port) end |
#connection_count ⇒ Object
:nodoc:
105 106 107 |
# File 'lib/nsq/subscriber.rb', line 105 def connection_count #:nodoc: @connection_hash.size end |
#create_connection_backoff_timer ⇒ Object
:nodoc:
89 90 91 |
# File 'lib/nsq/subscriber.rb', line 89 def create_connection_backoff_timer #:nodoc: BackoffTimer.new(@connection_min_interval, @connection_max_interval, @connection_ratio, @connection_short_length, @connection_long_length) end |
#create_ready_backoff_timer ⇒ Object
:nodoc:
85 86 87 |
# File 'lib/nsq/subscriber.rb', line 85 def create_ready_backoff_timer #:nodoc: BackoffTimer.new(@ready_min_interval, @ready_max_interval, @ready_ratio, @ready_short_length, @ready_long_length) end |
#handle_connection(connection) ⇒ Object
:nodoc:
131 132 133 |
# File 'lib/nsq/subscriber.rb', line 131 def handle_connection(connection) #:nodoc: connection.send_init(@topic, @channel) end |
#handle_frame_error(connection, error_message) ⇒ Object
:nodoc:
155 156 157 158 |
# File 'lib/nsq/subscriber.rb', line 155 def handle_frame_error(connection, ) #:nodoc: logger.error("Received error from nsqd: #{.inspect}") connection.reset end |
#handle_heartbeat(connection) ⇒ Object
:nodoc:
135 136 |
# File 'lib/nsq/subscriber.rb', line 135 def handle_heartbeat(connection) #:nodoc: end |
#handle_io_error(connection, exception) ⇒ Object
:nodoc:
160 161 162 163 |
# File 'lib/nsq/subscriber.rb', line 160 def handle_io_error(connection, exception) #:nodoc: logger.error("Socket error: #{exception.}\n\t#{exception.backtrace[0,2].join("\n\t")}") connection.reset end |
#handle_message(connection, message) ⇒ Object
:nodoc:
138 139 140 |
# File 'lib/nsq/subscriber.rb', line 138 def (connection, ) #:nodoc: (connection, , &@block) end |
#process_message(connection, message, &block) ⇒ Object
:nodoc:
142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/nsq/subscriber.rb', line 142 def (connection, , &block) #:nodoc: block[] connection.send_finish(.id, true) rescue Exception => e logger.error("Exception during handle_message: #{e.}\n\t#{e.backtrace.join("\n\t")}") if @max_tries && .attempts >= @max_tries logger.warning("Giving up on message after #{@max_tries} tries: #{.body.inspect}") connection.send_finish(.id, false) else connection.send_requeue(.id, .attempts * @requeue_delay) end end |
#ready_count ⇒ Object
The actual value for the READY message
99 100 101 102 103 |
# File 'lib/nsq/subscriber.rb', line 99 def ready_count #:nodoc: # TODO: Should we take into account the last_ready_count minus the number of messages sent since then? # Rounding up! (@max_in_flight + @connection_hash.size - 1) / @connection_hash.size end |
#ready_threshold ⇒ Object
Threshold for a connection where it’s time to send a new READY message
94 95 96 |
# File 'lib/nsq/subscriber.rb', line 94 def ready_threshold #:nodoc: @max_in_flight / @connection_hash.size / 4 end |
#remove_connection(host, port) ⇒ Object
:nodoc:
113 114 115 116 117 |
# File 'lib/nsq/subscriber.rb', line 113 def remove_connection(host, port) #:nodoc: connection = @connection_hash.delete([host, port]) return unless connection connection.close end |
#stop ⇒ Object
Stop this subscriber
120 121 122 123 124 |
# File 'lib/nsq/subscriber.rb', line 120 def stop @stopped = true @connection_hash.each_value(&:close) @connection_hash.clear end |
#stopped? ⇒ Boolean
Return true if this subscriber has been stopped
127 128 129 |
# File 'lib/nsq/subscriber.rb', line 127 def stopped? @stopped end |