Class: Falcon::Service::Supervisor

Inherits:
Generic
  • Object
show all
Defined in:
lib/falcon/service/supervisor.rb

Instance Method Summary collapse

Methods inherited from Generic

#include?, #logger, #name, wrap

Constructor Details

#initialize(environment) ⇒ Supervisor

Returns a new instance of Supervisor.



32
33
34
35
36
# File 'lib/falcon/service/supervisor.rb', line 32

def initialize(environment)
  super
  
  @bound_endpoint = nil
end

Instance Method Details

#do_metrics(message) ⇒ Object



54
55
56
# File 'lib/falcon/service/supervisor.rb', line 54

def do_metrics(message)
  Process::Metrics.capture(pid: Process.ppid, ppid: Process.ppid)
end

#do_restart(message) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/falcon/service/supervisor.rb', line 42

def do_restart(message)
  # Tell the parent of this process group to spin up a new process group/container.
  # Wait for that to start accepting new connections.
  # Stop accepting connections.
  # Wait for existing connnections to drain.
  # Terminate this process group.
  
  signal = message[:signal] || :INT
  
  Process.kill(signal, Process.ppid)
end

#endpointObject



38
39
40
# File 'lib/falcon/service/supervisor.rb', line 38

def endpoint
  @evaluator.endpoint
end

#handle(message) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/falcon/service/supervisor.rb', line 58

def handle(message)
  case message[:please]
  when 'restart'
    self.do_restart(message)
  when 'metrics'
    self.do_metrics(message)
  end
end

#setup(container) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/falcon/service/supervisor.rb', line 77

def setup(container)
  container.run(name: self.name, restart: true, count: 1) do |instance|
    Async do
      @bound_endpoint.accept do |peer|
        stream = Async::IO::Stream.new(peer)
        
        while message = stream.gets("\0")
          response = handle(JSON.parse(message, symbolize_names: true))
          stream.puts(response.to_json, separator: "\0")
        end
      end
      
      instance.ready!
    end
  end
  
  super
end

#startObject



67
68
69
70
71
72
73
74
75
# File 'lib/falcon/service/supervisor.rb', line 67

def start
  Async.logger.info(self) {"Binding to #{self.endpoint}..."}
  
  @bound_endpoint = Async::Reactor.run do
    Async::IO::SharedEndpoint.bound(self.endpoint)
  end.wait
  
  super
end

#stopObject



96
97
98
99
100
101
# File 'lib/falcon/service/supervisor.rb', line 96

def stop
  @bound_endpoint&.close
  @bound_endpoint = nil
  
  super
end