Class: Falcon::Service::Supervisor

Inherits:
Generic
  • Object
show all
Defined in:
lib/falcon/service/supervisor.rb

Overview

Implements a host supervisor which can restart the host services and provide various metrics about the running processes.

Instance Method Summary collapse

Methods inherited from Generic

#include?, #logger, #name, wrap

Constructor Details

#initialize(environment) ⇒ Supervisor

Initialize the supervisor using the given environment.



35
36
37
38
39
# File 'lib/falcon/service/supervisor.rb', line 35

def initialize(environment)
  super
  
  @bound_endpoint = nil
end

Instance Method Details

#do_metrics(message) ⇒ Object

Capture process metrics relating to the process group that the supervisor belongs to.



60
61
62
# File 'lib/falcon/service/supervisor.rb', line 60

def do_metrics(message)
  Process::Metrics::General.capture(pid: Process.ppid, ppid: Process.ppid)
end

#do_restart(message) ⇒ Object

Restart the process group that the supervisor belongs to.



48
49
50
51
52
53
54
55
56
57
# File 'lib/falcon/service/supervisor.rb', line 48

def do_restart(message)
  # Tell the parent of this process group to spin up a new process group/container.
  # Wait for that to start accepting new connections.
  # Stop accepting connections.
  # Wait for existing connnections to drain.
  # Terminate this process group.
  signal = message[:signal] || :INT
  
  Process.kill(signal, Process.ppid)
end

#endpointObject

The endpoint which the supervisor will bind to. Typically a unix pipe in the same directory as the host.



43
44
45
# File 'lib/falcon/service/supervisor.rb', line 43

def endpoint
  @evaluator.endpoint
end

#handle(message) ⇒ Object

Handle an incoming request.



66
67
68
69
70
71
72
73
# File 'lib/falcon/service/supervisor.rb', line 66

def handle(message)
  case message[:please]
  when 'restart'
    self.do_restart(message)
  when 'metrics'
    self.do_metrics(message)
  end
end

#setup(container) ⇒ Object

Start the supervisor process which accepts connections from the bound endpoint and processes JSON formatted messages.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/falcon/service/supervisor.rb', line 88

def setup(container)
  container.run(name: self.name, restart: true, count: 1) do |instance|
    Async do
      @bound_endpoint.accept do |peer|
        stream = Async::IO::Stream.new(peer)
        
        while message = stream.gets("\0")
          response = handle(JSON.parse(message, symbolize_names: true))
          stream.puts(response.to_json, separator: "\0")
        end
      end
      
      instance.ready!
    end
  end
  
  super
end

#startObject

Bind the supervisor to the specified endpoint.



76
77
78
79
80
81
82
83
84
# File 'lib/falcon/service/supervisor.rb', line 76

def start
  Console.logger.info(self) {"Binding to #{self.endpoint}..."}
  
  @bound_endpoint = Async::Reactor.run do
    Async::IO::SharedEndpoint.bound(self.endpoint)
  end.wait
  
  super
end

#stopObject

Release the bound endpoint.



108
109
110
111
112
113
# File 'lib/falcon/service/supervisor.rb', line 108

def stop
  @bound_endpoint&.close
  @bound_endpoint = nil
  
  super
end