Class: MobME::Infrastructure::Queue::ZeroMQ::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/mobme/infrastructure/queue/zeromq/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Server

Returns a new instance of Server.



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/mobme/infrastructure/queue/zeromq/server.rb', line 13

def initialize(options = {})
  @queue = MobME::Infrastructure::Queue.queue(:memory)
  @messages_socket = options[:messages_socket] || "ipc:///tmp/mobme-infrastructure-queue-messages.sock"
  @persistence_socket = options[:persistence_socket] || "ipc:///tmp/mobme-infrastructure-queue-persistence.sock"
  
  EM.synchrony do
    bind
    
    Fiber.new { listen_to_messages }.resume
    Fiber.new { listen_to_backlog_requests }.resume
  end
end

Instance Method Details

#bindObject



26
27
28
29
30
31
# File 'lib/mobme/infrastructure/queue/zeromq/server.rb', line 26

def bind
  @context = EM::ZeroMQ::Context.new(1)
  
  @messages_reply_server = @context.bind(ZMQ::REP, @messages_socket)
  @persistence_reply_server = @context.bind(ZMQ::REP, @persistence_socket)
end

#listen_to_backlog_requestsObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/mobme/infrastructure/queue/zeromq/server.rb', line 48

def listen_to_backlog_requests
  loop do
    handler = MobME::Infrastructure::Queue::ZeroMQ::ConnectionHandler.new(@persistence_reply_server)
    message = @persistence_reply_server.handler.receive_message
    
    
    message = Marshal.load(message) rescue nil
  
    queue_return = if message == "BACKLOG"
      queues_snapshot
    elsif ack_message?(message)
      puts "Got ACK: #{signature_from_ack_message(message)}"
      true
    else
      false
    end
    
    @persistence_reply_server.handler.send_message(Marshal.dump(queue_return))
  end
end

#listen_to_messagesObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/mobme/infrastructure/queue/zeromq/server.rb', line 33

def listen_to_messages
  loop do
    handler = MobME::Infrastructure::Queue::ZeroMQ::ConnectionHandler.new(@messages_reply_server)
    message = @messages_reply_server.handler.receive_message
    
    message = Marshal.load(message) rescue nil
    
    queue_return = if message
      route_to_queue(message)
    end
    
    @messages_reply_server.handler.send_message(Marshal.dump(queue_return))
  end
end