Class: MobME::Infrastructure::Queue::ZeroMQ::PersistenceServer

Inherits:
Object
  • Object
show all
Defined in:
lib/mobme/infrastructure/queue/zeromq/persistence_server.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ PersistenceServer

Returns a new instance of PersistenceServer.



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/mobme/infrastructure/queue/zeromq/persistence_server.rb', line 11

def initialize(options = {})
  @queue = MobME::Infrastructure::Queue.queue(:redis)
  @persistence_socket = options[:persistence_socket] || "ipc:///tmp/mobme-infrastructure-queue-persistence.sock"
  @persistence_store_path = options[:persistence_store_path] || "/tmp"
  @backlog_interval = options[:backlog_interval] || 10
  
  EM.synchrony do
    create_snapshot_directory
    bind
    
    send_backlog_requests
  end
end

Instance Method Details

#bindObject



25
26
27
28
29
# File 'lib/mobme/infrastructure/queue/zeromq/persistence_server.rb', line 25

def bind
  @context = EM::ZeroMQ::Context.new(1)
  
  @persistence_request_server = @context.connect(ZMQ::REQ, @persistence_socket)
end

#send_backlog_requestsObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/mobme/infrastructure/queue/zeromq/persistence_server.rb', line 31

def send_backlog_requests
  loop do
    handler = MobME::Infrastructure::Queue::ZeroMQ::ConnectionHandler.new(@persistence_request_server)
    handler.send_message Marshal.dump("BACKLOG")
    puts "Sent BACKLOG"
    
    snapshot = handler.receive_message
    snapshot = Marshal.load(snapshot) rescue nil
    
    case snapshot
    when nil
    when false
    else
      dump_snapshot_to_disk(snapshot)
      
      if snapshot and !snapshot.empty?
        handler.send_message Marshal.dump("ACK #{ack_signature(snapshot)}")
      
        # We get an OK back from the server
        handler.receive_message
      end
    end
    
    EM::Synchrony.sleep(@backlog_interval)
  end
end