Class: MobME::Infrastructure::Queue::ZeroMQ::Server
- Inherits:
-
Object
- Object
- MobME::Infrastructure::Queue::ZeroMQ::Server
- Defined in:
- lib/mobme/infrastructure/queue/zeromq/server.rb
Instance Method Summary collapse
- #bind ⇒ Object
-
#initialize(options = {}) ⇒ Server
constructor
A new instance of Server.
- #listen_to_backlog_requests ⇒ Object
- #listen_to_messages ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Server
Returns a new instance of Server.
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/mobme/infrastructure/queue/zeromq/server.rb', line 13 def initialize( = {}) @queue = MobME::Infrastructure::Queue.queue(:memory) @messages_socket = [:messages_socket] || "ipc:///tmp/mobme-infrastructure-queue-messages.sock" @persistence_socket = [:persistence_socket] || "ipc:///tmp/mobme-infrastructure-queue-persistence.sock" EM.synchrony do bind Fiber.new { }.resume Fiber.new { listen_to_backlog_requests }.resume end end |
Instance Method Details
#bind ⇒ Object
26 27 28 29 30 31 |
# File 'lib/mobme/infrastructure/queue/zeromq/server.rb', line 26 def bind @context = EM::ZeroMQ::Context.new(1) @messages_reply_server = @context.bind(ZMQ::REP, @messages_socket) @persistence_reply_server = @context.bind(ZMQ::REP, @persistence_socket) end |
#listen_to_backlog_requests ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/mobme/infrastructure/queue/zeromq/server.rb', line 48 def listen_to_backlog_requests loop do handler = MobME::Infrastructure::Queue::ZeroMQ::ConnectionHandler.new(@persistence_reply_server) = @persistence_reply_server.handler. = Marshal.load() rescue nil queue_return = if == "BACKLOG" queues_snapshot elsif () puts "Got ACK: #{()}" true else false end @persistence_reply_server.handler.(Marshal.dump(queue_return)) end end |
#listen_to_messages ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/mobme/infrastructure/queue/zeromq/server.rb', line 33 def loop do handler = MobME::Infrastructure::Queue::ZeroMQ::ConnectionHandler.new(@messages_reply_server) = @messages_reply_server.handler. = Marshal.load() rescue nil queue_return = if route_to_queue() end @messages_reply_server.handler.(Marshal.dump(queue_return)) end end |