Class: NSQ::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/nsq/publisher.rb

Instance Method Summary collapse

Constructor Details

#initialize(host, port, options = {}, &block) ⇒ Publisher

Returns a new instance of Publisher.



5
6
7
8
9
10
11
12
# File 'lib/nsq/publisher.rb', line 5

def initialize(host, port, options={}, &block)
  @socket = TCPSocket.open(host, port)
  @socket.write(MAGIC_V2)
  @response_timeout = options[:response_timeout] || 5
  yield self if block_given?
ensure
  close if block_given?
end

Instance Method Details

#closeObject



37
38
39
# File 'lib/nsq/publisher.rb', line 37

def close
  @socket.close
end

#publish(topic, message) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/nsq/publisher.rb', line 14

def publish(topic, message)
  buf = ['PUB ', topic, "\n", message.length, message].pack('a*a*a*Na*')
  @socket.write(buf)
  response = ''
  loop do
    response += @socket.recv(4096)
    size, frame, msg = response.unpack('NNa*')
    if response.length == size+4
      case msg
        when 'OK'            then return
        when '_heartbeat_'   then response = ""
        when 'E_INVALID'     then raise 'Invalid message'
        when 'E_BAD_TOPIC'   then raise 'Bad topic'
        when 'E_BAD_MESSAGE' then raise 'Bad message'
        when 'E_PUT_FAILED'  then raise 'Put failed'
        else raise "Unknown PUB response: #{msg}"
      end
    elsif response.length > size+4
      raise "Unexpected PUB response - Expected size = #{size} actual size = #{response.length-4}: message=#{msg}"
    end
  end
end