Class: NSQ::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/nsq/publisher.rb

Constant Summary collapse

SIZE_BYTES =
4

Instance Method Summary collapse

Constructor Details

#initialize(host, port, options = {}, &block) ⇒ Publisher

Returns a new instance of Publisher.



9
10
11
12
13
14
15
# File 'lib/nsq/publisher.rb', line 9

def initialize(host, port, options={}, &block)
  @socket = TCPSocket.open(host, port)
  @socket.write(MAGIC_V2)
  block[self] if block_given?
ensure
  close if block_given?
end

Instance Method Details

#closeObject



46
47
48
# File 'lib/nsq/publisher.rb', line 46

def close
  @socket.close if @socket
end

#publish(topic, message) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/nsq/publisher.rb', line 17

def publish(topic, message)
  buf = ['PUB ', topic, "\n", message.bytesize, message].pack('a*a*a*Na*')
  @socket.write(buf)

  response = ''
  have_received_ok = false
  loop do
    until response.size >= expected_size(response)
      response += @socket.recv(4096)
    end

    size = expected_size(response)

    # Extract the first message from `response`.
    first_message = response.slice!(0, size)
    _, _, data = first_message.unpack("NNa#{size}")
    have_received_ok ||= handle(data)

    # If the message was "OK", we can return successfully when the buffer
    # is empty.
    return if response.empty? && have_received_ok

    # We are now in a situation where we have processed a message, but we
    # must process more. Either it was an OK but we have partially read
    # another message that we must finish, or it was a heartbeat and we
    # must read again until we get an OK or an error code.
  end
end