Class: Paraspec::MsgpackServer

Inherits:
Object
  • Object
show all
Includes:
MsgpackHelpers
Defined in:
lib/paraspec/msgpack_server.rb

Instance Method Summary collapse

Methods included from MsgpackHelpers

#packer, #unpacker

Constructor Details

#initialize(master) ⇒ MsgpackServer

Returns a new instance of MsgpackServer.



9
10
11
# File 'lib/paraspec/msgpack_server.rb', line 9

def initialize(master)
  @master = master
end

Instance Method Details

#runObject



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/paraspec/msgpack_server.rb', line 13

def run
  @socket = ::TCPServer.new('127.0.0.1', MASTER_APP_PORT)
  begin
    while true
      s = @socket.accept_nonblock
      run_processing_thread(s)
    end
  rescue Errno::EAGAIN
    unless @master.stop?
      sleep 0.2
      retry
    end
  end
end

#run_processing_thread(s) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/paraspec/msgpack_server.rb', line 28

def run_processing_thread(s)
  Thread.new do
    u = unpacker(s)
    u.each do |obj|
      result = nil
      time = Benchmark.realtime do
        action = obj['action'].gsub('-', '_')
        payload = obj['payload']
        if payload
          payload = IpcHash.new.merge(payload)
          args = [payload]
        else
          args = []
        end

        Paraspec.logger.debug_ipc("SrvReq:#{obj['id']} #{obj}")
        result = @master.send(action, *args)

        pk = packer(s)
        resp = {result: result}
        Paraspec.logger.debug_ipc("SrvRes:#{obj['id']} #{resp}")
        pk.write(resp)
        pk.flush
        s.flush
      end
      Paraspec.logger.debug_perf("SrvReq:#{obj['id']} #{obj['action']}: #{result} #{'%.3f msec' % (time*1000)}")
    end
  end
end