Class: Nsq::Connection

Inherits:
Object
  • Object
show all
Includes:
AttributeLogger
Defined in:
lib/nsq/connection.rb

Constant Summary collapse

USER_AGENT =
"nsq-ruby/#{Nsq::Version::STRING}"
RESPONSE_HEARTBEAT =
'_heartbeat_'
RESPONSE_OK =
'OK'
@@log_attributes =
[:host, :port]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AttributeLogger

included

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/nsq/connection.rb', line 26

def initialize(opts = {})
  @host = opts[:host] || (raise ArgumentError, 'host is required')
  @port = opts[:port] || (raise ArgumentError, 'port is required')
  @queue = opts[:queue]
  @topic = opts[:topic]
  @channel = opts[:channel]
  @msg_timeout = opts[:msg_timeout] || 60_000 # 60s
  @max_in_flight = opts[:max_in_flight] || 1
  @tls_options = opts[:tls_options]
  if opts[:ssl_context]
    if @tls_options
      warn 'ssl_context and tls_options both set. Using tls_options. Ignoring ssl_context.'
    else
      @tls_options = opts[:ssl_context]
      warn 'ssl_context will be deprecated nsq-ruby version 3. Please use tls_options instead.'
    end
  end
  @tls_v1 = !!opts[:tls_v1]

  if @tls_options
    if @tls_v1
      validate_tls_options!
    else
      warn 'tls_options was provided, but tls_v1 is false. Skipping validation of tls_options.'
    end
  end

  if @msg_timeout < 1000
    raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.'
  end

  # for outgoing communication
  @write_queue = Queue.new

  # For indicating that the connection has died.
  # We use a Queue so we don't have to poll. Used to communicate across
  # threads (from write_loop and read_loop to connect_and_monitor).
  @death_queue = Queue.new

  @connected = false
  @presumed_in_flight = 0

  open_connection
  start_monitoring_connection
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



16
17
18
# File 'lib/nsq/connection.rb', line 16

def host
  @host
end

#max_in_flightObject

Returns the value of attribute max_in_flight.



18
19
20
# File 'lib/nsq/connection.rb', line 18

def max_in_flight
  @max_in_flight
end

#portObject (readonly)

Returns the value of attribute port.



17
18
19
# File 'lib/nsq/connection.rb', line 17

def port
  @port
end

#presumed_in_flightObject (readonly)

Returns the value of attribute presumed_in_flight.



19
20
21
# File 'lib/nsq/connection.rb', line 19

def presumed_in_flight
  @presumed_in_flight
end

Instance Method Details

#closeObject

close the connection and don’t try to re-open it



79
80
81
82
# File 'lib/nsq/connection.rb', line 79

def close
  stop_monitoring_connection
  close_connection
end

#connected?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/nsq/connection.rb', line 73

def connected?
  @connected
end

#dpub(topic, delay_in_ms, message) ⇒ Object



116
117
118
# File 'lib/nsq/connection.rb', line 116

def dpub(topic, delay_in_ms, message)
  write ["DPUB #{topic} #{delay_in_ms}\n", message.bytesize, message].pack('a*l>a*')
end

#fin(message_id) ⇒ Object



95
96
97
98
# File 'lib/nsq/connection.rb', line 95

def fin(message_id)
  write "FIN #{message_id}\n"
  decrement_in_flight
end

#mpub(topic, messages) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/nsq/connection.rb', line 120

def mpub(topic, messages)
  body = messages.map do |message|
    [message.bytesize, message].pack('l>a*')
  end.join

  write ["MPUB #{topic}\n", body.bytesize, messages.size, body].pack('a*l>l>a*')
end

#pub(topic, message) ⇒ Object



112
113
114
# File 'lib/nsq/connection.rb', line 112

def pub(topic, message)
  write ["PUB #{topic}\n", message.bytesize, message].pack('a*l>a*')
end

#rdy(count) ⇒ Object



90
91
92
# File 'lib/nsq/connection.rb', line 90

def rdy(count)
  write "RDY #{count}\n"
end

#re_up_readyObject

Tell the server we are ready for more messages!



130
131
132
133
134
135
136
# File 'lib/nsq/connection.rb', line 130

def re_up_ready
  rdy(@max_in_flight)
  # assume these messages are coming our way. yes, this might not be the
  # case, but it's much easier to manage our RDY state with the server if
  # we treat things this way.
  @presumed_in_flight = @max_in_flight
end

#req(message_id, timeout) ⇒ Object



101
102
103
104
# File 'lib/nsq/connection.rb', line 101

def req(message_id, timeout)
  write "REQ #{message_id} #{timeout}\n"
  decrement_in_flight
end

#sub(topic, channel) ⇒ Object



85
86
87
# File 'lib/nsq/connection.rb', line 85

def sub(topic, channel)
  write "SUB #{topic} #{channel}\n"
end

#touch(message_id) ⇒ Object



107
108
109
# File 'lib/nsq/connection.rb', line 107

def touch(message_id)
  write "TOUCH #{message_id}\n"
end