Class: Quartz::GoProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/quartz/go_process.rb

Constant Summary collapse

READ_EXCEPTION =
IO::EAGAINWaitReadable
MAX_MESSAGE_SIZE =

Bytes

8192

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ GoProcess

Returns a new instance of GoProcess.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/quartz/go_process.rb', line 25

def initialize(opts)
  @seed = SecureRandom.hex
  socket_dir = opts.fetch(:socket_dir) { '/tmp' }
  @socket_path = opts[:socket_path] || File.join(socket_dir, "quartz_#{seed}.sock")
  ENV['QUARTZ_SOCKET'] = @socket_path

  if opts[:file_path]
    Quartz::Validations.check_for_go
    compile_and_run(opts[:file_path])
  elsif opts[:bin_path]
    @go_process = IO.popen(opts[:bin_path])
  elsif opts[:socket_path]
    @external_socket = true
  else
    raise Quartz::ConfigError, 'Missing go binary'
  end

  block_until_server_starts
  self.class.processes << self
end

Instance Attribute Details

#seedObject (readonly)

Returns the value of attribute seed.



5
6
7
# File 'lib/quartz/go_process.rb', line 5

def seed
  @seed
end

#socket_pathObject (readonly)

Returns the value of attribute socket_path.



5
6
7
# File 'lib/quartz/go_process.rb', line 5

def socket_path
  @socket_path
end

#temp_file_pathObject (readonly)

Returns the value of attribute temp_file_path.



5
6
7
# File 'lib/quartz/go_process.rb', line 5

def temp_file_path
  @temp_file_path
end

Class Method Details

.cleanupObject



163
164
165
166
167
# File 'lib/quartz/go_process.rb', line 163

def self.cleanup
  return unless @processes
  @processes.each(&:cleanup)
  @processes = []
end

.clear_processesObject



11
12
13
# File 'lib/quartz/go_process.rb', line 11

def self.clear_processes
  @processes = []
end

.processesObject



7
8
9
# File 'lib/quartz/go_process.rb', line 7

def self.processes
  @processes ||= []
end

Instance Method Details

#block_until_server_startsObject



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/quartz/go_process.rb', line 68

def block_until_server_starts
  max_retries = 20
  retries = 0
  delay = 0.001 # seconds

  loop do
    return if File.exists?(@socket_path)
    raise Quartz::GoServerError, 'RPC server not starting' if retries > max_retries
    sleep(delay * 2**retries)
    retries += 1
  end
end

#call(struct_name, method, args) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/quartz/go_process.rb', line 100

def call(struct_name, method, args)
  payload = {
    'method' => "#{struct_name}.#{method}",
    'params' => [args],
    'id' => 1
  }

  begin
    socket.send(MultiJson.dump(payload), 0)
  rescue Errno::EPIPE
    # Retry send with a new socket. We might trigger this if Go
    # process restarted.  There's a good chance that this raises the
    # exact same error.
    new_socket!
    socket.send(MultiJson.dump(payload), 0)
  end

  read
end

#cleanupObject



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/quartz/go_process.rb', line 144

def cleanup
  # If we've forked, there's no need to cleanup since the parent
  # process will.
  return if @forked

  # If the Go process is managed externally, there's nothing to do.
  return if @external_socket

  unless @killed_go_process
    Process.kill('SIGTERM', pid)
    Process.wait(pid)
    @killed_go_process = true
  end

  if @temp_file_path && File.exists?(@temp_file_path)
    File.delete(@temp_file_path)
  end
end

#compile_and_run(path) ⇒ Object



46
47
48
49
50
51
52
53
54
# File 'lib/quartz/go_process.rb', line 46

def compile_and_run(path)
  @temp_file_path = "/tmp/quartz_runner_#{seed}"

  unless system('go', 'build', '-o', @temp_file_path, path)
    raise Quartz::ConfigError, 'Go compilation failed'
  end

  @go_process = IO.popen(@temp_file_path)
end

#forked_mode!Object



15
16
17
18
19
20
21
22
23
# File 'lib/quartz/go_process.rb', line 15

def forked_mode!
  if @forked.nil?
    @forked = true
  else
    @forked = !@forked
  end

  new_socket! if @forked
end

#get_metadataObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/quartz/go_process.rb', line 81

def 
  payload = {
    'method' => 'Quartz.GetMetadata',
    'params' => [],
    # This parameter isn't needed because we use a different
    # connection for each thread.
    'id' => 1
  }

  socket.send(MultiJson.dump(payload), 0)
  response = read

  if response['error']
    raise Quartz::GoResponseError, "Metadata error: #{response['error']}"
  end

  response['result']
end

#new_socket!Object



60
61
62
# File 'lib/quartz/go_process.rb', line 60

def new_socket!
  Thread.current["quartz_socket_#{seed}".to_sym] = UNIXSocket.new(@socket_path)
end

#pidObject



56
57
58
# File 'lib/quartz/go_process.rb', line 56

def pid
  @go_process.pid
end

#readObject



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/quartz/go_process.rb', line 128

def read
  value = ''
  loop do
    begin
      value << socket.recv_nonblock(MAX_MESSAGE_SIZE)
      break if value.end_with?("\n")
    rescue READ_EXCEPTION
      IO.select([socket], [], [])
    rescue Errno::EPIPE
      new_socket!
    end
  end

  MultiJson.load(value)
end

#socketObject



64
65
66
# File 'lib/quartz/go_process.rb', line 64

def socket
  Thread.current["quartz_socket_#{seed}".to_sym] ||= UNIXSocket.new(@socket_path)
end