Class: Quartz::GoProcess
- Inherits:
-
Object
- Object
- Quartz::GoProcess
- Defined in:
- lib/quartz/go_process.rb
Constant Summary collapse
- READ_EXCEPTION =
IO::EAGAINWaitReadable
- MAX_MESSAGE_SIZE =
Bytes
8192
Instance Attribute Summary collapse
-
#seed ⇒ Object
readonly
Returns the value of attribute seed.
-
#socket_path ⇒ Object
readonly
Returns the value of attribute socket_path.
-
#temp_file_path ⇒ Object
readonly
Returns the value of attribute temp_file_path.
Class Method Summary collapse
Instance Method Summary collapse
- #block_until_server_starts ⇒ Object
- #call(struct_name, method, args) ⇒ Object
- #cleanup ⇒ Object
- #compile_and_run(path) ⇒ Object
- #forked_mode! ⇒ Object
- #get_metadata ⇒ Object
-
#initialize(opts) ⇒ GoProcess
constructor
A new instance of GoProcess.
- #pid ⇒ Object
- #read ⇒ Object
- #socket ⇒ Object
Constructor Details
#initialize(opts) ⇒ GoProcess
Returns a new instance of GoProcess.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/quartz/go_process.rb', line 24 def initialize(opts) @seed = SecureRandom.hex socket_dir = opts.fetch(:socket_dir) { '/tmp' } @socket_path = opts[:socket_path] || File.join(socket_dir, "quartz_#{seed}.sock") ENV['QUARTZ_SOCKET'] = @socket_path if opts[:file_path] Quartz::Validations.check_for_go compile_and_run(opts[:file_path]) elsif opts[:bin_path] @go_process = IO.popen(opts[:bin_path]) elsif opts[:socket_path] @external_socket = true else raise Quartz::ConfigError, 'Missing go binary' end block_until_server_starts self.class.processes << self end |
Instance Attribute Details
#seed ⇒ Object (readonly)
Returns the value of attribute seed.
5 6 7 |
# File 'lib/quartz/go_process.rb', line 5 def seed @seed end |
#socket_path ⇒ Object (readonly)
Returns the value of attribute socket_path.
5 6 7 |
# File 'lib/quartz/go_process.rb', line 5 def socket_path @socket_path end |
#temp_file_path ⇒ Object (readonly)
Returns the value of attribute temp_file_path.
5 6 7 |
# File 'lib/quartz/go_process.rb', line 5 def temp_file_path @temp_file_path end |
Class Method Details
.cleanup ⇒ Object
146 147 148 149 150 |
# File 'lib/quartz/go_process.rb', line 146 def self.cleanup return unless @processes @processes.each(&:cleanup) @processes = [] end |
.clear_processes ⇒ Object
11 12 13 |
# File 'lib/quartz/go_process.rb', line 11 def self.clear_processes @processes = [] end |
.processes ⇒ Object
7 8 9 |
# File 'lib/quartz/go_process.rb', line 7 def self.processes @processes ||= [] end |
Instance Method Details
#block_until_server_starts ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/quartz/go_process.rb', line 63 def block_until_server_starts max_retries = 20 retries = 0 delay = 0.001 # seconds loop do return if File.exists?(@socket_path) raise Quartz::GoServerError, 'RPC server not starting' if retries > max_retries sleep(delay * 2**retries) retries += 1 end end |
#call(struct_name, method, args) ⇒ Object
95 96 97 98 99 100 101 102 103 |
# File 'lib/quartz/go_process.rb', line 95 def call(struct_name, method, args) payload = { 'method' => "#{struct_name}.#{method}", 'params' => [args], 'id' => 1 } socket.send(MultiJson.dump(payload), 0) read end |
#cleanup ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/quartz/go_process.rb', line 127 def cleanup # If we've forked, there's no need to cleanup since the parent # process will. return if @forked # If the Go process is managed externally, there's nothing to do. return if @external_socket unless @killed_go_process Process.kill('SIGTERM', pid) Process.wait(pid) @killed_go_process = true end if @temp_file_path && File.exists?(@temp_file_path) File.delete(@temp_file_path) end end |
#compile_and_run(path) ⇒ Object
45 46 47 48 49 50 51 52 53 |
# File 'lib/quartz/go_process.rb', line 45 def compile_and_run(path) @temp_file_path = "/tmp/quartz_runner_#{seed}" unless system('go', 'build', '-o', @temp_file_path, path) raise Quartz::ConfigError, 'Go compilation failed' end @go_process = IO.popen(@temp_file_path) end |
#forked_mode! ⇒ Object
15 16 17 18 19 20 21 22 |
# File 'lib/quartz/go_process.rb', line 15 def forked_mode! if @forked.nil? @forked = true return end @forked = !@forked end |
#get_metadata ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/quartz/go_process.rb', line 76 def payload = { 'method' => 'Quartz.GetMetadata', 'params' => [], # This parameter isn't needed because we use a different # connection for each thread. 'id' => 1 } socket.send(MultiJson.dump(payload), 0) response = read if response['error'] raise Quartz::GoResponseError, "Metadata error: #{response['error']}" end response['result'] end |
#pid ⇒ Object
55 56 57 |
# File 'lib/quartz/go_process.rb', line 55 def pid @go_process.pid end |
#read ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/quartz/go_process.rb', line 113 def read value = '' loop do begin value << socket.recv_nonblock(MAX_MESSAGE_SIZE) break if value.end_with?("\n") rescue READ_EXCEPTION IO.select([socket], [], []) end end MultiJson.load(value) end |
#socket ⇒ Object
59 60 61 |
# File 'lib/quartz/go_process.rb', line 59 def socket Thread.current["quartz_socket_#{seed}".to_sym] ||= UNIXSocket.new(@socket_path) end |