Class: NSQ::Connection
Overview
Represents a single subscribed connection to an nsqd server.
Instance Method Summary collapse
-
#close(permanent = true) ⇒ Object
:nodoc:.
-
#connect ⇒ Object
:nodoc:.
-
#initialize(reader, subscriber, host, port) ⇒ Connection
constructor
A new instance of Connection.
-
#reset ⇒ Object
:nodoc:.
-
#send_finish(id, success) ⇒ Object
:nodoc:.
-
#send_init(topic, channel) ⇒ Object
:nodoc:.
-
#send_ready ⇒ Object
:nodoc:.
-
#send_requeue(id, time_ms) ⇒ Object
:nodoc:.
Methods included from Logger
Constructor Details
#initialize(reader, subscriber, host, port) ⇒ Connection
Returns a new instance of Connection.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/nsq/connection.rb', line 9 def initialize(reader, subscriber, host, port) @reader = reader @subscriber = subscriber @selector = reader.selector @host = host @port = port @write_monitor = Monitor.new @ready_mutex = Mutex.new @sending_ready = false # Connect states :init, :interval, :connecting, :connected, :closed @connect_state = :init @connection_backoff_timer = nil @ready_backoff_timer = @subscriber.create_ready_backoff_timer connect end |
Instance Method Details
#close(permanent = true) ⇒ Object
:nodoc:
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/nsq/connection.rb', line 81 def close(permanent=true) #:nodoc: logger.debug "Closing..." @write_monitor.synchronize do begin @selector.deregister(@socket) # Use straight socket to write otherwise we need to use Monitor instead of Mutex @socket.write "CLS\n" @socket.close rescue Exception ensure @connect_state = permanent ? :closed : :init @socket = nil end end end |
#connect ⇒ Object
:nodoc:
97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/nsq/connection.rb', line 97 def connect #:nodoc: return unless verify_connect_state?(:init, :interval) logger.debug {"#{self}: Beginning connect"} @connect_state = :connecting @buffer = '' @ready_count = 0 @socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) @sockaddr = Socket.pack_sockaddr_in(@port, @host) @socket.set_encoding 'UTF-8' @monitor = @selector.register(@socket, :w) @monitor.value = method(:do_connect) do_connect end |
#reset ⇒ Object
:nodoc:
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/nsq/connection.rb', line 62 def reset #:nodoc: return unless verify_connect_state?(:connecting, :connected) # Close with the hopes of re-establishing close(false) @write_monitor.synchronize do return unless verify_connect_state?(:init) @connection_backoff_timer ||= @subscriber.create_connection_backoff_timer @connection_backoff_timer.failure interval = @connection_backoff_timer.interval if interval > 0 @connect_state = :interval logger.debug("Reattempting connection in #{interval} seconds") @reader.add_timeout(interval, &method(:connect)) else connect end end end |
#send_finish(id, success) ⇒ Object
:nodoc:
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/nsq/connection.rb', line 40 def send_finish(id, success) #:nodoc: write "FIN #{id}\n" @ready_mutex.synchronize do @ready_count -= 1 if success @ready_backoff_timer.success else @ready_backoff_timer.failure end check_ready end end |
#send_init(topic, channel) ⇒ Object
:nodoc:
28 29 30 31 32 |
# File 'lib/nsq/connection.rb', line 28 def send_init(topic, channel) #:nodoc: write NSQ::MAGIC_V2 write "SUB #{topic} #{channel}\n" send_ready end |
#send_ready ⇒ Object
:nodoc:
34 35 36 37 38 |
# File 'lib/nsq/connection.rb', line 34 def send_ready #:nodoc: @ready_count = @subscriber.ready_count write "RDY #{@ready_count}\n" unless @subscriber.stopped? @sending_ready = false end |
#send_requeue(id, time_ms) ⇒ Object
:nodoc:
53 54 55 56 57 58 59 60 |
# File 'lib/nsq/connection.rb', line 53 def send_requeue(id, time_ms) #:nodoc: write "REQ #{id} #{time_ms}\n" @ready_mutex.synchronize do @ready_count -= 1 @ready_backoff_timer.failure check_ready end end |