Class: CrystalMQ::Consumer
- Inherits:
-
Object
- Object
- CrystalMQ::Consumer
- Defined in:
- lib/crystalmq.rb
Defined Under Namespace
Classes: ConsumerPayload
Instance Method Summary collapse
- #connect_socket ⇒ Object
- #consume ⇒ Object
-
#initialize(host, topic, channel) ⇒ Consumer
constructor
A new instance of Consumer.
Constructor Details
#initialize(host, topic, channel) ⇒ Consumer
Returns a new instance of Consumer.
40 41 42 43 44 45 46 47 48 |
# File 'lib/crystalmq.rb', line 40 def initialize(host, topic, channel) @topic = topic @channel = channel @host = host connect_socket rescue StandardError connect_socket retry end |
Instance Method Details
#connect_socket ⇒ Object
50 51 52 53 |
# File 'lib/crystalmq.rb', line 50 def connect_socket @socket = TCPSocket.new(@host, 1235) @socket.sync = true end |
#consume ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/crystalmq.rb', line 55 def consume @socket.write(ConsumerPayload.new(@topic, @channel).to_msgpack) to_process = [] unpacker = MessagePack::Unpacker.new(@socket) loop do = MessagePayload.from_msgpack(unpacker.read) yield . end @socket.close rescue StandardError connect_socket retry end |