Class: NSQ::Connection
- Inherits:
-
Object
- Object
- NSQ::Connection
- Defined in:
- lib/nsq/connection.rb
Overview
Represents a single subscribed connection to an nsqd server.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#close(permanent = true) ⇒ Object
:nodoc:.
-
#connect ⇒ Object
:nodoc:.
-
#initialize(reader, subscriber, host, port) ⇒ Connection
constructor
A new instance of Connection.
-
#reset ⇒ Object
:nodoc:.
-
#send_finish(id, success) ⇒ Object
:nodoc:.
-
#send_init(topic, channel, short_id, long_id) ⇒ Object
:nodoc:.
-
#send_ready ⇒ Object
:nodoc:.
-
#send_requeue(id, time_ms) ⇒ Object
:nodoc:.
-
#to_s ⇒ Object
:nodoc:.
Constructor Details
#initialize(reader, subscriber, host, port) ⇒ Connection
Returns a new instance of Connection.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/nsq/connection.rb', line 9 def initialize(reader, subscriber, host, port) @reader = reader @subscriber = subscriber @selector = reader.selector @host = host @port = port @name = "#{subscriber.name}:#{host}:#{port}" @write_monitor = Monitor.new @ready_mutex = Mutex.new @sending_ready = false # Connect states :init, :interval, :connecting, :connected, :closed @connect_state = :init @next_connection_time = nil @next_ready_time = nil @connection_backoff_timer = nil @ready_backoff_timer = @subscriber.create_ready_backoff_timer connect end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
7 8 9 |
# File 'lib/nsq/connection.rb', line 7 def name @name end |
Instance Method Details
#close(permanent = true) ⇒ Object
:nodoc:
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/nsq/connection.rb', line 86 def close(permanent=true) #:nodoc: NSQ.logger.debug {"#{@name}: Closing..."} @write_monitor.synchronize do begin @selector.deregister(@socket) # Use straight socket to write otherwise we need to use Monitor instead of Mutex @socket.write "CLS\n" @socket.close rescue Exception => e ensure @connect_state = permanent ? :closed : :init @socket = nil end end end |
#connect ⇒ Object
:nodoc:
102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/nsq/connection.rb', line 102 def connect #:nodoc: return unless verify_connect_state?(:init, :interval) NSQ.logger.debug {"#{self}: Beginning connect"} @connect_state = :connecting @buffer = '' @ready_count = 0 @socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) @sockaddr = Socket.pack_sockaddr_in(@port, @host) @monitor = @selector.register(@socket, :w) @monitor.value = proc { do_connect } do_connect end |
#reset ⇒ Object
:nodoc:
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nsq/connection.rb', line 65 def reset #:nodoc: return unless verify_connect_state?(:connecting, :connected) # Close with the hopes of re-establishing close(false) @write_monitor.synchronize do return unless verify_connect_state?(:init) @connection_backoff_timer ||= @subscriber.create_connection_backoff_timer @connection_backoff_timer.failure interval = @connection_backoff_timer.interval if interval > 0 @connect_state = :interval NSQ.logger.debug {"#{self}: Reattempting connection in #{interval} seconds"} @reader.add_timeout(interval) do connect end else connect end end end |
#send_finish(id, success) ⇒ Object
:nodoc:
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/nsq/connection.rb', line 43 def send_finish(id, success) #:nodoc: write "FIN #{id}\n" @ready_mutex.synchronize do @ready_count -= 1 if success @ready_backoff_timer.success else @ready_backoff_timer.failure end check_ready end end |
#send_init(topic, channel, short_id, long_id) ⇒ Object
:nodoc:
31 32 33 34 35 |
# File 'lib/nsq/connection.rb', line 31 def send_init(topic, channel, short_id, long_id) #:nodoc: write NSQ::MAGIC_V2 write "SUB #{topic} #{channel} #{short_id} #{long_id}\n" self.send_ready end |
#send_ready ⇒ Object
:nodoc:
37 38 39 40 41 |
# File 'lib/nsq/connection.rb', line 37 def send_ready #:nodoc: @ready_count = @subscriber.ready_count write "RDY #{@ready_count}\n" unless @subscriber.stopped? @sending_ready = false end |
#send_requeue(id, time_ms) ⇒ Object
:nodoc:
56 57 58 59 60 61 62 63 |
# File 'lib/nsq/connection.rb', line 56 def send_requeue(id, time_ms) #:nodoc: write "REQ #{id} #{time_ms}\n" @ready_mutex.synchronize do @ready_count -= 1 @ready_backoff_timer.failure check_ready end end |
#to_s ⇒ Object
:nodoc:
115 116 117 |
# File 'lib/nsq/connection.rb', line 115 def to_s #:nodoc: @name end |