Class: Nsq::Connection
- Inherits:
-
Object
- Object
- Nsq::Connection
- Includes:
- AttributeLogger
- Defined in:
- lib/nsq/connection.rb
Constant Summary collapse
- USER_AGENT =
"nsq-ruby/#{Nsq::Version::STRING}"- RESPONSE_HEARTBEAT =
'_heartbeat_'- RESPONSE_OK =
'OK'- @@log_attributes =
[:host, :port]
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#presumed_in_flight ⇒ Object
readonly
Returns the value of attribute presumed_in_flight.
Instance Method Summary collapse
-
#close ⇒ Object
close the connection and don’t try to re-open it.
- #connected? ⇒ Boolean
- #fin(message_id) ⇒ Object
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
- #mpub(topic, messages) ⇒ Object
- #pub(topic, message) ⇒ Object
- #rdy(count) ⇒ Object
-
#re_up_ready ⇒ Object
Tell the server we are ready for more messages!.
- #req(message_id, timeout) ⇒ Object
- #sub(topic, channel) ⇒ Object
- #touch(message_id) ⇒ Object
Methods included from AttributeLogger
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/nsq/connection.rb', line 25 def initialize(opts = {}) @host = opts[:host] || (raise ArgumentError, 'host is required') @port = opts[:port] || (raise ArgumentError, 'host is required') @queue = opts[:queue] @topic = opts[:topic] @channel = opts[:channel] @msg_timeout = opts[:msg_timeout] || 60_000 # 60s @max_in_flight = opts[:max_in_flight] || 1 if @msg_timeout < 1000 raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.' end # for outgoing communication @write_queue = Queue.new # For indicating that the connection has died. # We use a Queue so we don't have to poll. Used to communicate across # threads (from write_loop and read_loop to connect_and_monitor). @death_queue = Queue.new @connected = false @presumed_in_flight = 0 open_connection start_monitoring_connection end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
15 16 17 |
# File 'lib/nsq/connection.rb', line 15 def host @host end |
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
17 18 19 |
# File 'lib/nsq/connection.rb', line 17 def max_in_flight @max_in_flight end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
16 17 18 |
# File 'lib/nsq/connection.rb', line 16 def port @port end |
#presumed_in_flight ⇒ Object (readonly)
Returns the value of attribute presumed_in_flight.
18 19 20 |
# File 'lib/nsq/connection.rb', line 18 def presumed_in_flight @presumed_in_flight end |
Instance Method Details
#close ⇒ Object
close the connection and don’t try to re-open it
60 61 62 63 |
# File 'lib/nsq/connection.rb', line 60 def close stop_monitoring_connection close_connection end |
#connected? ⇒ Boolean
54 55 56 |
# File 'lib/nsq/connection.rb', line 54 def connected? @connected end |
#fin(message_id) ⇒ Object
76 77 78 79 |
# File 'lib/nsq/connection.rb', line 76 def fin() write "FIN #{}\n" decrement_in_flight end |
#mpub(topic, messages) ⇒ Object
98 99 100 101 102 103 104 |
# File 'lib/nsq/connection.rb', line 98 def mpub(topic, ) body = .map do || [.bytesize, ].pack('l>a*') end.join write ["MPUB #{topic}\n", body.bytesize, .size, body].pack('a*l>l>a*') end |
#pub(topic, message) ⇒ Object
93 94 95 |
# File 'lib/nsq/connection.rb', line 93 def pub(topic, ) write ["PUB #{topic}\n", .bytesize, ].pack('a*l>a*') end |
#rdy(count) ⇒ Object
71 72 73 |
# File 'lib/nsq/connection.rb', line 71 def rdy(count) write "RDY #{count}\n" end |
#re_up_ready ⇒ Object
Tell the server we are ready for more messages!
108 109 110 111 112 113 114 |
# File 'lib/nsq/connection.rb', line 108 def re_up_ready rdy(@max_in_flight) # assume these messages are coming our way. yes, this might not be the # case, but it's much easier to manage our RDY state with the server if # we treat things this way. @presumed_in_flight = @max_in_flight end |
#req(message_id, timeout) ⇒ Object
82 83 84 85 |
# File 'lib/nsq/connection.rb', line 82 def req(, timeout) write "REQ #{} #{timeout}\n" decrement_in_flight end |
#sub(topic, channel) ⇒ Object
66 67 68 |
# File 'lib/nsq/connection.rb', line 66 def sub(topic, channel) write "SUB #{topic} #{channel}\n" end |
#touch(message_id) ⇒ Object
88 89 90 |
# File 'lib/nsq/connection.rb', line 88 def touch() write "TOUCH #{}\n" end |