Class: Nsq::Connection
- Inherits:
-
Object
- Object
- Nsq::Connection
- Includes:
- AttributeLogger
- Defined in:
- lib/nsq/connection.rb
Constant Summary collapse
- USER_AGENT =
"nsq-ruby/#{Nsq::Version::STRING}"
- RESPONSE_HEARTBEAT =
'_heartbeat_'
- RESPONSE_OK =
'OK'
- @@log_attributes =
[:host, :port]
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#presumed_in_flight ⇒ Object
readonly
Returns the value of attribute presumed_in_flight.
Instance Method Summary collapse
-
#close ⇒ Object
close the connection and don’t try to re-open it.
- #connected? ⇒ Boolean
- #dpub(topic, delay_in_ms, message) ⇒ Object
- #fin(message_id) ⇒ Object
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
- #mpub(topic, messages) ⇒ Object
- #pub(topic, message) ⇒ Object
- #rdy(count) ⇒ Object
-
#re_up_ready ⇒ Object
Tell the server we are ready for more messages!.
- #req(message_id, timeout) ⇒ Object
- #sub(topic, channel) ⇒ Object
- #touch(message_id) ⇒ Object
Methods included from AttributeLogger
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/nsq/connection.rb', line 26 def initialize(opts = {}) @host = opts[:host] || (raise ArgumentError, 'host is required') @port = opts[:port] || (raise ArgumentError, 'port is required') @queue = opts[:queue] @topic = opts[:topic] @channel = opts[:channel] @msg_timeout = opts[:msg_timeout] || 60_000 # 60s @max_in_flight = opts[:max_in_flight] || 1 @tls_options = opts[:tls_options] @max_attempts = opts[:max_attempts] if opts[:ssl_context] if @tls_options warn 'ssl_context and tls_options both set. Using tls_options. Ignoring ssl_context.' else @tls_options = opts[:ssl_context] warn 'ssl_context will be deprecated nsq-ruby version 3. Please use tls_options instead.' end end @tls_v1 = !!opts[:tls_v1] if @tls_options if @tls_v1 else warn 'tls_options was provided, but tls_v1 is false. Skipping validation of tls_options.' end end if @msg_timeout < 1000 raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.' end # for outgoing communication @write_queue = SizedQueue.new(10000) # For indicating that the connection has died. # We use a Queue so we don't have to poll. Used to communicate across # threads (from write_loop and read_loop to connect_and_monitor). @death_queue = Queue.new @connected = false @presumed_in_flight = 0 open_connection start_monitoring_connection end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
16 17 18 |
# File 'lib/nsq/connection.rb', line 16 def host @host end |
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
18 19 20 |
# File 'lib/nsq/connection.rb', line 18 def max_in_flight @max_in_flight end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
17 18 19 |
# File 'lib/nsq/connection.rb', line 17 def port @port end |
#presumed_in_flight ⇒ Object (readonly)
Returns the value of attribute presumed_in_flight.
19 20 21 |
# File 'lib/nsq/connection.rb', line 19 def presumed_in_flight @presumed_in_flight end |
Instance Method Details
#close ⇒ Object
close the connection and don’t try to re-open it
80 81 82 83 |
# File 'lib/nsq/connection.rb', line 80 def close stop_monitoring_connection close_connection end |
#connected? ⇒ Boolean
74 75 76 |
# File 'lib/nsq/connection.rb', line 74 def connected? @connected end |
#dpub(topic, delay_in_ms, message) ⇒ Object
117 118 119 |
# File 'lib/nsq/connection.rb', line 117 def dpub(topic, delay_in_ms, ) write ["DPUB #{topic} #{delay_in_ms}\n", .bytesize, ].pack('a*l>a*') end |
#fin(message_id) ⇒ Object
96 97 98 99 |
# File 'lib/nsq/connection.rb', line 96 def fin() write "FIN #{}\n" decrement_in_flight end |
#mpub(topic, messages) ⇒ Object
121 122 123 124 125 126 127 |
# File 'lib/nsq/connection.rb', line 121 def mpub(topic, ) body = .map do || [.bytesize, ].pack('l>a*') end.join write ["MPUB #{topic}\n", body.bytesize, .size, body].pack('a*l>l>a*') end |
#pub(topic, message) ⇒ Object
113 114 115 |
# File 'lib/nsq/connection.rb', line 113 def pub(topic, ) write ["PUB #{topic}\n", .bytesize, ].pack('a*l>a*') end |
#rdy(count) ⇒ Object
91 92 93 |
# File 'lib/nsq/connection.rb', line 91 def rdy(count) write "RDY #{count}\n" end |
#re_up_ready ⇒ Object
Tell the server we are ready for more messages!
131 132 133 134 135 136 137 |
# File 'lib/nsq/connection.rb', line 131 def re_up_ready rdy(@max_in_flight) # assume these messages are coming our way. yes, this might not be the # case, but it's much easier to manage our RDY state with the server if # we treat things this way. @presumed_in_flight = @max_in_flight end |
#req(message_id, timeout) ⇒ Object
102 103 104 105 |
# File 'lib/nsq/connection.rb', line 102 def req(, timeout) write "REQ #{} #{timeout}\n" decrement_in_flight end |
#sub(topic, channel) ⇒ Object
86 87 88 |
# File 'lib/nsq/connection.rb', line 86 def sub(topic, channel) write "SUB #{topic} #{channel}\n" end |
#touch(message_id) ⇒ Object
108 109 110 |
# File 'lib/nsq/connection.rb', line 108 def touch() write "TOUCH #{}\n" end |