Class: RFlow::Master
- Inherits:
-
DaemonProcess
- Object
- DaemonProcess
- RFlow::Master
- Defined in:
- lib/rflow/master.rb
Overview
The master/watchdog process for RFlow. Mostly exists to receive SIGCHLD from subprocesses so it can kill them all with SIGQUIT and get restarted.
Constant Summary
Constants inherited from DaemonProcess
Instance Attribute Summary collapse
- #brokers ⇒ Array<Broker> readonly
- #shards ⇒ Array<Shard> readonly
Instance Method Summary collapse
-
#initialize(config) ⇒ Master
constructor
A new instance of Master.
-
#run_process ⇒ void
Override that starts EventMachine and waits until it gets stopped.
-
#spawn_subprocesses ⇒ void
Override of #spawn_subprocesses that actually spawns them, then calls Shard#run! on each.
-
#subprocesses ⇒ Array<ChildProcess>
Override of #subprocesses that includes the Brokers and every Shard::Worker of every Shard.
Methods inherited from DaemonProcess
#daemonize!, #run!, #shutdown!
Constructor Details
#initialize(config) ⇒ Master
17 18 19 20 21 22 |
# File 'lib/rflow/master.rb', line 17 def initialize(config) super(config['rflow.application_name'], 'Master', pid_file_path: config['rflow.pid_file_path']) @shards = config.shards.map {|config| Shard.new(config) } RFlow.logger.context_width = @shards.flat_map(&:workers).map(&:name).map(&:length).max @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) } end |
Instance Attribute Details
#brokers ⇒ Array<Broker> (readonly)
The Brokers being managed by the RFlow::Master.
15 16 17 |
# File 'lib/rflow/master.rb', line 15 def brokers @brokers end |
#shards ⇒ Array<Shard> (readonly)
The Shards being managed by the RFlow::Master.
11 12 13 |
# File 'lib/rflow/master.rb', line 11 def shards @shards end |
Instance Method Details
#run_process ⇒ void
This method returns an undefined value.
Override that starts EventMachine and waits until it gets stopped.
44 45 46 47 48 |
# File 'lib/rflow/master.rb', line 44 def run_process EM.run do # TODO: Monitor the workers end end |
#spawn_subprocesses ⇒ void
This method returns an undefined value.
Override of #spawn_subprocesses that actually spawns them, then calls Shard#run! on each.
27 28 29 30 31 32 33 |
# File 'lib/rflow/master.rb', line 27 def spawn_subprocesses RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0 brokers.each(&:spawn!) RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(', ')}" if brokers.count > 0 shards.each(&:run!) end |
#subprocesses ⇒ Array<ChildProcess>
Override of #subprocesses that includes the Brokers and every Shard::Worker of every Shard.
38 39 40 |
# File 'lib/rflow/master.rb', line 38 def subprocesses brokers + shards.flat_map(&:workers) end |