Class: Pixo::Rpc::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/pixo/rpc/service.rb

Direct Known Subclasses

Pixo::Renderer, ApplicationService

Instance Method Summary collapse

Constructor Details

#initialize(reader_pipe, writer_pipe) ⇒ Service

Returns a new instance of Service.



8
9
10
11
12
13
14
15
16
# File 'lib/pixo/rpc/service.rb', line 8

def initialize(reader_pipe, writer_pipe)
  @reader_pipe = reader_pipe
  @writer_pipe = writer_pipe

  @running = true
  @live_requests = Concurrent::Hash.new

  @pipe_mutex = Mutex.new
end

Instance Method Details

#live_request_countObject



75
76
77
# File 'lib/pixo/rpc/service.rb', line 75

def live_request_count
  @live_requests.count
end

#request(message, timeout: 10, async: false) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/pixo/rpc/service.rb', line 50

def request(message, timeout: 10, async: false)
  request = Pixo::Rpc::Request.new(message, async)
  @live_requests[request.message.rid] = request unless async

  bytes_to_write = Base64.strict_encode64(Marshal.dump(request.message))
  @pipe_mutex.synchronize do
    @writer_pipe.write(bytes_to_write)
    @writer_pipe.write($/)
    @writer_pipe.flush
  end

  return if async

  unless request.latch.wait(timeout)
    raise Timeout::Error.new("%s: request timed out after %.3f seconds." % [request.message.rid, timeout] )
  end
  
  return request.response
rescue
  STDERR.puts "request: #{$!.inspect}"
  raise
ensure
  @live_requests.delete request.message.rid
end

#runObject



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/pixo/rpc/service.rb', line 18

def run
  while @running && (line = @reader_pipe.readline)
    message = Marshal.load(Base64.strict_decode64(line.strip))
    if message.is_a?(Pixo::Rpc::RequestMessage)
      resp = Pixo::Rpc::ResponseMessage.new(message.data.call(self), message.rid)
      unless message.async
        bytes_to_write = Base64.strict_encode64(Marshal.dump(resp))
        @pipe_mutex.synchronize do
          @writer_pipe.write(bytes_to_write)
          @writer_pipe.write($/)
          @writer_pipe.flush
        end
      end
    elsif message.is_a?(Pixo::Rpc::ResponseMessage)
      @live_requests[message.rid]
      request = @live_requests[message.rid]
      request.send_result(message.data) if request
    end
  end
rescue EOFError => e
rescue IOError => e
rescue
  STDERR.puts("RUN: #{$!}")
  $!.backtrace.each do |back| STDERR.puts(back) end
end

#shutdownObject



44
45
46
47
48
# File 'lib/pixo/rpc/service.rb', line 44

def shutdown
  @running = false
  @reader_pipe.close
  @writer_pipe.close
end