Class: Falcon::Service::Supervisor

Inherits:
Generic
  • Object
show all
Defined in:
lib/falcon/service/supervisor.rb

Overview

Implements a host supervisor which can restart the host services and provide various metrics about the running processes.

Instance Method Summary collapse

Methods inherited from Generic

#include?, #logger, #name, wrap

Constructor Details

#initialize(environment) ⇒ Supervisor

Initialize the supervisor using the given environment.



35
36
37
38
39
# File 'lib/falcon/service/supervisor.rb', line 35

def initialize(environment)
	super
	
	@bound_endpoint = nil
end

Instance Method Details

#do_metrics(message) ⇒ Object

Capture process metrics relating to the process group that the supervisor belongs to.



60
61
62
# File 'lib/falcon/service/supervisor.rb', line 60

def do_metrics(message)
	Process::Metrics::General.capture(pid: Process.ppid, ppid: Process.ppid)
end

#do_restart(message) ⇒ Object

Restart the process group that the supervisor belongs to.



48
49
50
51
52
53
54
55
56
57
# File 'lib/falcon/service/supervisor.rb', line 48

def do_restart(message)
	# Tell the parent of this process group to spin up a new process group/container.
	# Wait for that to start accepting new connections.
	# Stop accepting connections.
	# Wait for existing connnections to drain.
	# Terminate this process group.
	signal = message[:signal] || :INT
	
	Process.kill(signal, Process.ppid)
end

#endpointObject

The endpoint which the supervisor will bind to. Typically a unix pipe in the same directory as the host.



43
44
45
# File 'lib/falcon/service/supervisor.rb', line 43

def endpoint
	@evaluator.endpoint
end

#handle(message) ⇒ Object

Handle an incoming request.



66
67
68
69
70
71
72
73
# File 'lib/falcon/service/supervisor.rb', line 66

def handle(message)
	case message[:please]
	when 'restart'
		self.do_restart(message)
	when 'metrics'
		self.do_metrics(message)
	end
end

#setup(container) ⇒ Object

Start the supervisor process which accepts connections from the bound endpoint and processes JSON formatted messages.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/falcon/service/supervisor.rb', line 88

def setup(container)
	container.run(name: self.name, restart: true, count: 1) do |instance|
		Async do
			@bound_endpoint.accept do |peer|
				stream = Async::IO::Stream.new(peer)
				
				while message = stream.gets("\0")
					response = handle(JSON.parse(message, symbolize_names: true))
					stream.puts(response.to_json, separator: "\0")
				end
			end
			
			instance.ready!
		end
	end
	
	super
end

#startObject

Bind the supervisor to the specified endpoint.



76
77
78
79
80
81
82
83
84
# File 'lib/falcon/service/supervisor.rb', line 76

def start
	Async.logger.info(self) {"Binding to #{self.endpoint}..."}
	
	@bound_endpoint = Async::Reactor.run do
		Async::IO::SharedEndpoint.bound(self.endpoint)
	end.wait
	
	super
end

#stopObject

Release the bound endpoint.



108
109
110
111
112
113
# File 'lib/falcon/service/supervisor.rb', line 108

def stop
	@bound_endpoint&.close
	@bound_endpoint = nil
	
	super
end