Class: Nsq::Connection

Inherits:
Object
  • Object
show all
Includes:
AttributeLogger
Defined in:
lib/nsq/connection.rb

Constant Summary collapse

USER_AGENT =
"nsq-ruby/#{Nsq::Version::STRING}"
RESPONSE_HEARTBEAT =
'_heartbeat_'
RESPONSE_OK =
'OK'
@@log_attributes =
[:host, :port]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AttributeLogger

included

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/nsq/connection.rb', line 26

def initialize(opts = {})
  @host = opts[:host] || (raise ArgumentError, 'host is required')
  @port = opts[:port] || (raise ArgumentError, 'port is required')
  @queue = opts[:queue]
  @topic = opts[:topic]
  @channel = opts[:channel]
  @msg_timeout = opts[:msg_timeout] || 60_000 # 60s
  @max_in_flight = opts[:max_in_flight] || 1
  @tls_options = opts[:tls_options]
  @max_attempts = opts[:max_attempts]
  if opts[:ssl_context]
    if @tls_options
      warn 'ssl_context and tls_options both set. Using tls_options. Ignoring ssl_context.'
    else
      @tls_options = opts[:ssl_context]
      warn 'ssl_context will be deprecated nsq-ruby version 3. Please use tls_options instead.'
    end
  end
  @tls_v1 = !!opts[:tls_v1]

  if @tls_options
    if @tls_v1
      validate_tls_options!
    else
      warn 'tls_options was provided, but tls_v1 is false. Skipping validation of tls_options.'
    end
  end

  if @msg_timeout < 1000
    raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.'
  end

  # for outgoing communication
  @write_queue = SizedQueue.new(10000)

  # For indicating that the connection has died.
  # We use a Queue so we don't have to poll. Used to communicate across
  # threads (from write_loop and read_loop to connect_and_monitor).
  @death_queue = Queue.new

  @connected = false
  @presumed_in_flight = 0

  open_connection
  start_monitoring_connection
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



16
17
18
# File 'lib/nsq/connection.rb', line 16

def host
  @host
end

#max_in_flightObject

Returns the value of attribute max_in_flight.



18
19
20
# File 'lib/nsq/connection.rb', line 18

def max_in_flight
  @max_in_flight
end

#portObject (readonly)

Returns the value of attribute port.



17
18
19
# File 'lib/nsq/connection.rb', line 17

def port
  @port
end

#presumed_in_flightObject (readonly)

Returns the value of attribute presumed_in_flight.



19
20
21
# File 'lib/nsq/connection.rb', line 19

def presumed_in_flight
  @presumed_in_flight
end

Instance Method Details

#closeObject

close the connection and don’t try to re-open it



80
81
82
83
# File 'lib/nsq/connection.rb', line 80

def close
  stop_monitoring_connection
  close_connection
end

#connected?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/nsq/connection.rb', line 74

def connected?
  @connected
end

#dpub(topic, delay_in_ms, message) ⇒ Object



117
118
119
# File 'lib/nsq/connection.rb', line 117

def dpub(topic, delay_in_ms, message)
  write ["DPUB #{topic} #{delay_in_ms}\n", message.bytesize, message].pack('a*l>a*')
end

#fin(message_id) ⇒ Object



96
97
98
99
# File 'lib/nsq/connection.rb', line 96

def fin(message_id)
  write "FIN #{message_id}\n"
  decrement_in_flight
end

#mpub(topic, messages) ⇒ Object



121
122
123
124
125
126
127
# File 'lib/nsq/connection.rb', line 121

def mpub(topic, messages)
  body = messages.map do |message|
    [message.bytesize, message].pack('l>a*')
  end.join

  write ["MPUB #{topic}\n", body.bytesize, messages.size, body].pack('a*l>l>a*')
end

#pub(topic, message) ⇒ Object



113
114
115
# File 'lib/nsq/connection.rb', line 113

def pub(topic, message)
  write ["PUB #{topic}\n", message.bytesize, message].pack('a*l>a*')
end

#rdy(count) ⇒ Object



91
92
93
# File 'lib/nsq/connection.rb', line 91

def rdy(count)
  write "RDY #{count}\n"
end

#re_up_readyObject

Tell the server we are ready for more messages!



131
132
133
134
135
136
137
# File 'lib/nsq/connection.rb', line 131

def re_up_ready
  rdy(@max_in_flight)
  # assume these messages are coming our way. yes, this might not be the
  # case, but it's much easier to manage our RDY state with the server if
  # we treat things this way.
  @presumed_in_flight = @max_in_flight
end

#req(message_id, timeout) ⇒ Object



102
103
104
105
# File 'lib/nsq/connection.rb', line 102

def req(message_id, timeout)
  write "REQ #{message_id} #{timeout}\n"
  decrement_in_flight
end

#sub(topic, channel) ⇒ Object



86
87
88
# File 'lib/nsq/connection.rb', line 86

def sub(topic, channel)
  write "SUB #{topic} #{channel}\n"
end

#touch(message_id) ⇒ Object



108
109
110
# File 'lib/nsq/connection.rb', line 108

def touch(message_id)
  write "TOUCH #{message_id}\n"
end