Class: Nsq::Connection
- Inherits:
-
Object
- Object
- Nsq::Connection
- Includes:
- AttributeLogger
- Defined in:
- lib/nsq/connection.rb
Constant Summary collapse
- USER_AGENT =
"nsq-ruby/#{Nsq::Version::STRING}"- RESPONSE_HEARTBEAT =
'_heartbeat_'- RESPONSE_OK =
'OK'- @@log_attributes =
[:host, :port]
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#presumed_in_flight ⇒ Object
readonly
Returns the value of attribute presumed_in_flight.
Instance Method Summary collapse
-
#close ⇒ Object
close the connection and don’t try to re-open it.
- #connected? ⇒ Boolean
- #dpub(topic, delay_in_ms, message) ⇒ Object
- #fin(message_id) ⇒ Object
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
- #mpub(topic, messages) ⇒ Object
- #pub(topic, message) ⇒ Object
- #rdy(count) ⇒ Object
-
#re_up_ready ⇒ Object
Tell the server we are ready for more messages!.
- #req(message_id, timeout) ⇒ Object
- #sub(topic, channel) ⇒ Object
- #touch(message_id) ⇒ Object
Methods included from AttributeLogger
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/nsq/connection.rb', line 26 def initialize(opts = {}) @host = opts[:host] || (raise ArgumentError, 'host is required') @port = opts[:port] || (raise ArgumentError, 'port is required') @queue = opts[:queue] @topic = opts[:topic] @channel = opts[:channel] @msg_timeout = opts[:msg_timeout] || 60_000 # 60s @max_in_flight = opts[:max_in_flight] || 1 = opts[:tls_options] if opts[:ssl_context] if warn 'ssl_context and tls_options both set. Using tls_options. Ignoring ssl_context.' else = opts[:ssl_context] warn 'ssl_context will be deprecated nsq-ruby version 3. Please use tls_options instead.' end end @tls_v1 = !!opts[:tls_v1] if if @tls_v1 else warn 'tls_options was provided, but tls_v1 is false. Skipping validation of tls_options.' end end if @msg_timeout < 1000 raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.' end # for outgoing communication @write_queue = Queue.new # For indicating that the connection has died. # We use a Queue so we don't have to poll. Used to communicate across # threads (from write_loop and read_loop to connect_and_monitor). @death_queue = Queue.new @connected = false @presumed_in_flight = 0 open_connection start_monitoring_connection end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
16 17 18 |
# File 'lib/nsq/connection.rb', line 16 def host @host end |
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
18 19 20 |
# File 'lib/nsq/connection.rb', line 18 def max_in_flight @max_in_flight end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
17 18 19 |
# File 'lib/nsq/connection.rb', line 17 def port @port end |
#presumed_in_flight ⇒ Object (readonly)
Returns the value of attribute presumed_in_flight.
19 20 21 |
# File 'lib/nsq/connection.rb', line 19 def presumed_in_flight @presumed_in_flight end |
Instance Method Details
#close ⇒ Object
close the connection and don’t try to re-open it
79 80 81 82 |
# File 'lib/nsq/connection.rb', line 79 def close stop_monitoring_connection close_connection end |
#connected? ⇒ Boolean
73 74 75 |
# File 'lib/nsq/connection.rb', line 73 def connected? @connected end |
#dpub(topic, delay_in_ms, message) ⇒ Object
116 117 118 |
# File 'lib/nsq/connection.rb', line 116 def dpub(topic, delay_in_ms, ) write ["DPUB #{topic} #{delay_in_ms}\n", .bytesize, ].pack('a*l>a*') end |
#fin(message_id) ⇒ Object
95 96 97 98 |
# File 'lib/nsq/connection.rb', line 95 def fin() write "FIN #{message_id}\n" decrement_in_flight end |
#mpub(topic, messages) ⇒ Object
120 121 122 123 124 125 126 |
# File 'lib/nsq/connection.rb', line 120 def mpub(topic, ) body = .map do || [.bytesize, ].pack('l>a*') end.join write ["MPUB #{topic}\n", body.bytesize, .size, body].pack('a*l>l>a*') end |
#pub(topic, message) ⇒ Object
112 113 114 |
# File 'lib/nsq/connection.rb', line 112 def pub(topic, ) write ["PUB #{topic}\n", .bytesize, ].pack('a*l>a*') end |
#rdy(count) ⇒ Object
90 91 92 |
# File 'lib/nsq/connection.rb', line 90 def rdy(count) write "RDY #{count}\n" end |
#re_up_ready ⇒ Object
Tell the server we are ready for more messages!
130 131 132 133 134 135 136 |
# File 'lib/nsq/connection.rb', line 130 def re_up_ready rdy(@max_in_flight) # assume these messages are coming our way. yes, this might not be the # case, but it's much easier to manage our RDY state with the server if # we treat things this way. @presumed_in_flight = @max_in_flight end |
#req(message_id, timeout) ⇒ Object
101 102 103 104 |
# File 'lib/nsq/connection.rb', line 101 def req(, timeout) write "REQ #{message_id} #{timeout}\n" decrement_in_flight end |
#sub(topic, channel) ⇒ Object
85 86 87 |
# File 'lib/nsq/connection.rb', line 85 def sub(topic, channel) write "SUB #{topic} #{channel}\n" end |
#touch(message_id) ⇒ Object
107 108 109 |
# File 'lib/nsq/connection.rb', line 107 def touch() write "TOUCH #{message_id}\n" end |