Class: Expedite::Server::Agent

Inherits:
Object
  • Object
show all
Includes:
Expedite::Signals
Defined in:
lib/expedite/server/agent.rb

Overview

This code runs in the process that has the actual code pre-loaded, and is used to serve requests.

  • An “invoke” request is handled in the agent itself.

  • A “fork” request causes the agent to fork, and the forked process handles the request.

Each agent processes a single request at a time, unless it is an Expedite::Action::Boot request which is used to make derived agents.

Constant Summary

Constants included from Expedite::Signals

Expedite::Signals::IGNORE_SIGNALS

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, manager:, env:) ⇒ Agent

Returns a new instance of Agent.



40
41
42
43
44
45
46
47
48
49
# File 'lib/expedite/server/agent.rb', line 40

def initialize(name:, manager:, env:)
  @name         = name
  @manager      = manager
  @env          = env
  @mutex        = Mutex.new
  @waiting      = Set.new
  @preloaded    = false
  @state        = :initialized
  @interrupt    = IO.pipe
end

Instance Attribute Details

#envObject (readonly)

Returns the value of attribute env.



37
38
39
# File 'lib/expedite/server/agent.rb', line 37

def env
  @env
end

#managerObject (readonly)

Returns the value of attribute manager.



37
38
39
# File 'lib/expedite/server/agent.rb', line 37

def manager
  @manager
end

#nameObject (readonly)

Returns the value of attribute name.



36
37
38
# File 'lib/expedite/server/agent.rb', line 36

def name
  @name
end

Instance Method Details

#agentObject



79
80
81
# File 'lib/expedite/server/agent.rb', line 79

def agent
  @agent ||= Expedite::Agents.lookup(name)
end

#app_nameObject



75
76
77
# File 'lib/expedite/server/agent.rb', line 75

def app_name
  env.app_name
end

#bootObject



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/expedite/server/agent.rb', line 51

def boot
  # This is necessary for the terminal to work correctly when we reopen stdin.
  Process.setsid rescue Errno::EPERM

  Expedite.app = self

  Signal.trap("TERM") { terminate }

  env.load_helper

  serve
end

#connect_databaseObject



271
272
273
# File 'lib/expedite/server/agent.rb', line 271

def connect_database
  ActiveRecord::Base.establish_connection if active_record_configured?
end

#disconnect_databaseObject



267
268
269
# File 'lib/expedite/server/agent.rb', line 267

def disconnect_database
  ActiveRecord::Base.remove_connection if active_record_configured?
end

#eager_preloadObject



116
117
118
# File 'lib/expedite/server/agent.rb', line 116

def eager_preload
  with_pty { preload }
end

#exitObject



250
251
252
253
254
255
# File 'lib/expedite/server/agent.rb', line 250

def exit
  state :exiting
  manager.shutdown(:RDWR)
  exit_if_finished
  sleep
end

#exit_if_finishedObject



257
258
259
260
261
# File 'lib/expedite/server/agent.rb', line 257

def exit_if_finished
  @mutex.synchronize {
    Kernel.exit if exiting? && @waiting.empty?
  }
end

#exiting?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/expedite/server/agent.rb', line 95

def exiting?
  @state == :exiting
end

#initialized?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/expedite/server/agent.rb', line 103

def initialized?
  @state == :initialized
end

#invoke_after_fork_callbacksObject



263
264
265
# File 'lib/expedite/server/agent.rb', line 263

def invoke_after_fork_callbacks
  # TODO:
end

#log(message) ⇒ Object



83
84
85
# File 'lib/expedite/server/agent.rb', line 83

def log(message)
  env.log "[application:#{name}] #{message}"
end

#preloadObject



107
108
109
110
111
112
113
114
# File 'lib/expedite/server/agent.rb', line 107

def preload
  log "preloading app"

  @preloaded = :success
rescue Exception => e
  @preloaded = :failure
  raise e unless initialized?
end

#preload_failed?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/expedite/server/agent.rb', line 91

def preload_failed?
  @preloaded == :failure
end

#preloaded?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/expedite/server/agent.rb', line 87

def preloaded?
  @preloaded
end


294
295
296
297
298
# File 'lib/expedite/server/agent.rb', line 294

def print_exception(stream, error)
  first, rest = error.backtrace.first, error.backtrace.drop(1)
  stream.puts("#{first}: #{error} (#{error.class})")
  rest.each { |line| stream.puts("\tfrom #{line}") }
end

#reset_streamsObject



313
314
315
316
317
318
# File 'lib/expedite/server/agent.rb', line 313

def reset_streams
  [STDOUT, STDERR].each do |stream|
    stream.reopen(env.log_file)
  end
  STDIN.reopen("/dev/null")
end

#serveObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/expedite/server/agent.rb', line 120

def serve
  $0 = "expedite agent | #{app_name} | #{name}"

  agent.run_hook(:before_serve, name)

  state :running
  manager.puts

  loop do
    IO.select [manager, @interrupt.first]

    if terminating? || preload_failed?
      agent.run_hook(:after_serve, name)
      exit
    else
      serve_request(manager.recv_io(UNIXSocket))
    end
  end
end

#serve_fork(client, action, cargs, cenv) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/expedite/server/agent.rb', line 206

def serve_fork(client, action, cargs, cenv)
  fork do
    Process.setsid
    IGNORE_SIGNALS.each { |sig| trap(sig, "DEFAULT") }
    trap("TERM", "DEFAULT")

    # Load in the current env vars, except those which *were* changed when Spring started
    cenv.each { |k, v| ENV[k] = v }

    # requiring is faster, so if config.cache_classes was true in
    # the environment's config file, then we can respect that from
    # here on as we no longer need constant reloading.
    if @original_cache_classes
      ActiveSupport::Dependencies.mechanism = :require
      Rails.application.config.cache_classes = true
    end

    connect_database
    srand

    invoke_after_fork_callbacks
    shush_backtraces

    begin
      ret = action.call(*cargs)
    rescue => e
      client.send_exception(e, self.env)
    else
      client.send_return(ret, self.env)
    end
  end
end

#serve_invoke(client, action, cargs, cenv) ⇒ Object

Returns pid of the current process



195
196
197
198
199
200
201
202
203
204
# File 'lib/expedite/server/agent.rb', line 195

def serve_invoke(client, action, cargs, cenv)
  begin
    ret = action.call(*cargs)
  rescue Exception => e
    client.send_exception(e, self.env)
  else
    client.send_return(ret, self.env)
  end
  Process.pid
end

#serve_request(client) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/expedite/server/agent.rb', line 140

def serve_request(client)
  puts "got client"
  manager.puts

  streams = client.recv_setup(env)

  preload unless preloaded?

  cargs, cenv, cmethod = client.recv_object(env).values_at("args", "env", "method")
  log "serve #{cargs} using #{cmethod}"

  exec_name = cargs.shift
  action    = Expedite::Actions.lookup(exec_name)
  action.setup(streams)
  # TODO: before(:request)

  connect_database # why are we connecting prior? is this for invoke?
  pid = case cmethod
  when "invoke"
    # TODO: Invoke in a worker process instead of the preloader
    serve_invoke(client, action, cargs, cenv)
  else
    serve_fork(client, action, cargs, cenv)
  end

  disconnect_database

  log "forked #{pid}" # pid is current process
  manager.puts pid

  # Boot makes a new application, so we don't wait for it
  if action.is_a?(Expedite::Action::Boot)
    Process.detach(pid)
  else
    wait pid, streams, client
  end
rescue Exception => e
  log "exception: #{e} at #{e.backtrace.join("\n")}"
  manager.puts unless pid

  if streams && !e.is_a?(SystemExit)
    print_exception(stderr, e)
    streams.each(&:close)
  end

  client.puts(1) if pid
  client.close
ensure
  # Redirect STDOUT and STDERR to prevent from keeping the original FDs
  # (i.e. to prevent `spring rake -T | grep db` from hanging forever),
  # even when exception is raised before forking (i.e. preloading).
  reset_streams
end

#shush_backtracesObject

This feels very naughty



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/expedite/server/agent.rb', line 276

def shush_backtraces
  Kernel.module_eval do
    old_raise = Kernel.method(:raise)
    remove_method :raise
    define_method :raise do |*args|
      begin
        old_raise.call(*args)
      ensure
        if $!
          lib = File.expand_path("..", __FILE__)
          $!.backtrace.reject! { |line| line.start_with?(lib) }
        end
      end
    end
    private :raise
  end
end

#state(val) ⇒ Object



64
65
66
67
68
# File 'lib/expedite/server/agent.rb', line 64

def state(val)
  return if exiting?
  log "#{@state} -> #{val}"
  @state = val
end

#state!(val) ⇒ Object



70
71
72
73
# File 'lib/expedite/server/agent.rb', line 70

def state!(val)
  state val
  @interrupt.last.write "."
end

#terminateObject



239
240
241
242
243
244
245
246
247
248
# File 'lib/expedite/server/agent.rb', line 239

def terminate
  if exiting?
    # Ensure that we do not ignore subsequent termination attempts
    log "forced exit"
    @waiting.each { |pid| Process.kill("TERM", pid) }
    Kernel.exit
  else
    state! :terminating
  end
end

#terminating?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/expedite/server/agent.rb', line 99

def terminating?
  @state == :terminating
end

#wait(pid, streams, client) ⇒ Object



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/expedite/server/agent.rb', line 320

def wait(pid, streams, client)
  if pid != Process.pid
    @mutex.synchronize { @waiting << pid }
  end

  # Wait in a separate thread so we can run multiple actions at once
  Expedite.failsafe_thread {
    begin
      exitstatue = if pid == Process.pid
        log "#{pid} is current process"
        0
      else
        _, status = Process.wait2 pid
        log "#{pid} exited with #{status.exitstatus}"
      end

      streams.each(&:close)
      client.puts(exitstatus)
      client.close
    ensure
      if pid != Process.pid
        @mutex.synchronize { @waiting.delete pid }
      end
      exit_if_finished
    end
  }

  Expedite.failsafe_thread {
    while signal = client.gets.chomp
      begin
        Process.kill(signal, -Process.getpgid(pid))
        client.puts(0)
      rescue Errno::ESRCH
        client.puts(1)
      end
    end
  }
end

#with_ptyObject



300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/expedite/server/agent.rb', line 300

def with_pty
  PTY.open do |master, slave|
    [STDOUT, STDERR, STDIN].each { |s| s.reopen slave }
    reader_thread = Expedite.failsafe_thread { master.read }
    begin
      yield
    ensure
      reader_thread.kill
      reset_streams
    end
  end
end