Class: Nsq::Connection

Inherits:
Object
  • Object
show all
Includes:
AttributeLogger
Defined in:
lib/nsq/connection.rb

Constant Summary collapse

USER_AGENT =
"nsq-ruby/#{Nsq::Version::STRING}"
RESPONSE_HEARTBEAT =
'_heartbeat_'
RESPONSE_OK =
'OK'
@@log_attributes =
[:host, :port]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AttributeLogger

included

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/nsq/connection.rb', line 25

def initialize(opts = {})
  @host = opts[:host] || (raise ArgumentError, 'host is required')
  @port = opts[:port] || (raise ArgumentError, 'host is required')
  @queue = opts[:queue]
  @topic = opts[:topic]
  @channel = opts[:channel]
  @msg_timeout = opts[:msg_timeout] || 60_000 # 60s
  @max_in_flight = opts[:max_in_flight] || 1

  if @msg_timeout < 1000
    raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.'
  end

  # for outgoing communication
  @write_queue = Queue.new

  # For indicating that the connection has died.
  # We use a Queue so we don't have to poll. Used to communicate across
  # threads (from write_loop and read_loop to connect_and_monitor).
  @death_queue = Queue.new

  @connected = false
  @presumed_in_flight = 0

  open_connection
  start_monitoring_connection
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



15
16
17
# File 'lib/nsq/connection.rb', line 15

def host
  @host
end

#max_in_flightObject

Returns the value of attribute max_in_flight.



17
18
19
# File 'lib/nsq/connection.rb', line 17

def max_in_flight
  @max_in_flight
end

#portObject (readonly)

Returns the value of attribute port.



16
17
18
# File 'lib/nsq/connection.rb', line 16

def port
  @port
end

#presumed_in_flightObject (readonly)

Returns the value of attribute presumed_in_flight.



18
19
20
# File 'lib/nsq/connection.rb', line 18

def presumed_in_flight
  @presumed_in_flight
end

Instance Method Details

#closeObject

close the connection and don’t try to re-open it



60
61
62
63
# File 'lib/nsq/connection.rb', line 60

def close
  stop_monitoring_connection
  close_connection
end

#connected?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/nsq/connection.rb', line 54

def connected?
  @connected
end

#fin(message_id) ⇒ Object



76
77
78
79
# File 'lib/nsq/connection.rb', line 76

def fin(message_id)
  write "FIN #{message_id}\n"
  decrement_in_flight
end

#mpub(topic, messages) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/nsq/connection.rb', line 98

def mpub(topic, messages)
  body = messages.map do |message|
    [message.bytesize, message].pack('l>a*')
  end.join

  write ["MPUB #{topic}\n", body.bytesize, messages.size, body].pack('a*l>l>a*')
end

#pub(topic, message) ⇒ Object



93
94
95
# File 'lib/nsq/connection.rb', line 93

def pub(topic, message)
  write ["PUB #{topic}\n", message.bytesize, message].pack('a*l>a*')
end

#rdy(count) ⇒ Object



71
72
73
# File 'lib/nsq/connection.rb', line 71

def rdy(count)
  write "RDY #{count}\n"
end

#re_up_readyObject

Tell the server we are ready for more messages!



108
109
110
111
112
113
114
# File 'lib/nsq/connection.rb', line 108

def re_up_ready
  rdy(@max_in_flight)
  # assume these messages are coming our way. yes, this might not be the
  # case, but it's much easier to manage our RDY state with the server if
  # we treat things this way.
  @presumed_in_flight = @max_in_flight
end

#req(message_id, timeout) ⇒ Object



82
83
84
85
# File 'lib/nsq/connection.rb', line 82

def req(message_id, timeout)
  write "REQ #{message_id} #{timeout}\n"
  decrement_in_flight
end

#sub(topic, channel) ⇒ Object



66
67
68
# File 'lib/nsq/connection.rb', line 66

def sub(topic, channel)
  write "SUB #{topic} #{channel}\n"
end

#touch(message_id) ⇒ Object



88
89
90
# File 'lib/nsq/connection.rb', line 88

def touch(message_id)
  write "TOUCH #{message_id}\n"
end