Class: Quartz::GoProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/quartz/go_process.rb

Constant Summary collapse

MAX_MESSAGE_SIZE =

Bytes

1_000_000_000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ GoProcess

Returns a new instance of GoProcess.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/quartz/go_process.rb', line 9

def initialize(opts)
  @socket_path = "/tmp/quartz_#{rand(10000)}.sock"
  ENV['QUARTZ_SOCKET'] = @socket_path

  if opts[:file_path]
    compile_and_run(opts[:file_path])
  elsif opts[:bin_path]
    @go_process = IO.popen(opts[:bin_path])
  else
    raise 'Missing go binary'
  end

  block_until_server_starts
  self.class.processes << self
end

Instance Attribute Details

#temp_file_pathObject (readonly)

Returns the value of attribute temp_file_path.



3
4
5
# File 'lib/quartz/go_process.rb', line 3

def temp_file_path
  @temp_file_path
end

Class Method Details

.cleanupObject



101
102
103
# File 'lib/quartz/go_process.rb', line 101

def self.cleanup
  processes.each { |p| p.cleanup }
end

.processesObject



5
6
7
# File 'lib/quartz/go_process.rb', line 5

def self.processes
  @processes ||= []
end

Instance Method Details

#block_until_server_startsObject



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/quartz/go_process.rb', line 43

def block_until_server_starts
  max_retries = 10
  retries = 0
  delay = 0.1 # seconds

  loop do
    raise 'RPC server not starting' if retries > max_retries
    return if File.exists?(@socket_path)
    sleep(delay * retries * 2**retries)
    retries += 1
  end
end

#call(struct_name, method, args) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/quartz/go_process.rb', line 73

def call(struct_name, method, args)
  payload = {
    'method' => 'Quartz.Call',
    'params' => [{
        'StructName' => struct_name,
        'Method' => method,
        'MethodArgs' => args.to_json
      }],
    'id' => 1
  }

  socket.send(payload.to_json, 0)
  read
end

#cleanupObject



94
95
96
97
98
99
# File 'lib/quartz/go_process.rb', line 94

def cleanup
  Process.kill('SIGTERM', pid)
  Process.wait(pid)
  File.delete(@temp_file_path) if @temp_file_path
  self.class.processes.delete(self)
end

#compile_and_run(path) ⇒ Object



25
26
27
28
29
30
31
32
33
# File 'lib/quartz/go_process.rb', line 25

def compile_and_run(path)
  @temp_file_path = "/tmp/quartz_runner_#{rand(10000)}"

  unless system('go', 'build', '-o', @temp_file_path, path)
    raise 'Go compilation failed'
  end

  @go_process = IO.popen(@temp_file_path)
end

#get_metadataObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/quartz/go_process.rb', line 56

def 
  payload = {
    'method' => 'Quartz.GetMetadata',
    'params' => [],
    'id' => 1
  }

  socket.send(payload.to_json, 0)
  response = read

  if response['error']
    raise "Metadata error: #{read['error']}"
  end

  response['result']
end

#pidObject



35
36
37
# File 'lib/quartz/go_process.rb', line 35

def pid
  @go_process.pid
end

#readObject



90
91
92
# File 'lib/quartz/go_process.rb', line 90

def read
  JSON(socket.recv(MAX_MESSAGE_SIZE))
end

#socketObject



39
40
41
# File 'lib/quartz/go_process.rb', line 39

def socket
  Thread.current[:quartz_socket] ||= UNIXSocket.new(@socket_path)
end