Class: CrystalMQ::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/crystalmq.rb

Defined Under Namespace

Classes: ConsumerPayload

Instance Method Summary collapse

Constructor Details

#initialize(host, topic, channel) ⇒ Consumer

Returns a new instance of Consumer.



40
41
42
43
44
45
46
47
48
# File 'lib/crystalmq.rb', line 40

def initialize(host, topic, channel)
  @topic = topic
  @channel = channel
  @host = host
  connect_socket
rescue StandardError
  connect_socket
  retry
end

Instance Method Details

#connect_socketObject



50
51
52
53
# File 'lib/crystalmq.rb', line 50

def connect_socket
  @socket = TCPSocket.new(@host, 1235)
  @socket.sync = true
end

#consumeObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/crystalmq.rb', line 55

def consume
  @socket.write(ConsumerPayload.new(@topic, @channel).to_msgpack)
  to_process = []
  unpacker = MessagePack::Unpacker.new(@socket)
  
  loop do
    message = MessagePayload.from_msgpack(unpacker.read)
    yield message.message
  end
  @socket.close
rescue StandardError
  connect_socket
  retry
end