Class: Pixo::Rpc::Service
- Inherits:
-
Object
- Object
- Pixo::Rpc::Service
show all
- Defined in:
- lib/pixo/rpc/service.rb
Instance Method Summary
collapse
Constructor Details
#initialize(reader_pipe, writer_pipe) ⇒ Service
Returns a new instance of Service.
8
9
10
11
12
13
14
15
16
|
# File 'lib/pixo/rpc/service.rb', line 8
def initialize(reader_pipe, writer_pipe)
@reader_pipe = reader_pipe
@writer_pipe = writer_pipe
@running = true
@live_requests = Concurrent::Hash.new
@pipe_mutex = Mutex.new
end
|
Instance Method Details
#live_request_count ⇒ Object
75
76
77
|
# File 'lib/pixo/rpc/service.rb', line 75
def live_request_count
@live_requests.count
end
|
#request(message, timeout: 10, async: false) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/pixo/rpc/service.rb', line 50
def request(message, timeout: 10, async: false)
request = Pixo::Rpc::Request.new(message, async)
@live_requests[request.message.rid] = request unless async
bytes_to_write = Base64.strict_encode64(Marshal.dump(request.message))
@pipe_mutex.synchronize do
@writer_pipe.write(bytes_to_write)
@writer_pipe.write($/)
@writer_pipe.flush
end
return if async
unless request.latch.wait(timeout)
raise Timeout::Error.new("%s: request timed out after %.3f seconds." % [request.message.rid, timeout] )
end
return request.response
rescue
STDERR.puts "request: #{$!.inspect}"
raise
ensure
@live_requests.delete request.message.rid
end
|
#run ⇒ Object
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/pixo/rpc/service.rb', line 18
def run
while @running && (line = @reader_pipe.readline)
message = Marshal.load(Base64.strict_decode64(line.strip))
if message.is_a?(Pixo::Rpc::RequestMessage)
resp = Pixo::Rpc::ResponseMessage.new(message.data.call(self), message.rid)
unless message.async
bytes_to_write = Base64.strict_encode64(Marshal.dump(resp))
@pipe_mutex.synchronize do
@writer_pipe.write(bytes_to_write)
@writer_pipe.write($/)
@writer_pipe.flush
end
end
elsif message.is_a?(Pixo::Rpc::ResponseMessage)
@live_requests[message.rid]
request = @live_requests[message.rid]
request.send_result(message.data) if request
end
end
rescue EOFError => e
rescue IOError => e
rescue
STDERR.puts("RUN: #{$!}")
$!.backtrace.each do |back| STDERR.puts(back) end
end
|
#shutdown ⇒ Object
44
45
46
47
48
|
# File 'lib/pixo/rpc/service.rb', line 44
def shutdown
@running = false
@reader_pipe.close
@writer_pipe.close
end
|