Class: RFlow::Master

Inherits:
DaemonProcess show all
Defined in:
lib/rflow/master.rb

Overview

The master/watchdog process for RFlow. Mostly exists to receive SIGCHLD from subprocesses so it can kill them all with SIGQUIT and get restarted.

Constant Summary

Constants inherited from DaemonProcess

DaemonProcess::SIGINFO

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from DaemonProcess

#daemonize!, #run!, #shutdown!

Constructor Details

#initialize(config) ⇒ Master



17
18
19
20
21
22
# File 'lib/rflow/master.rb', line 17

def initialize(config)
  super(config['rflow.application_name'], 'Master', pid_file_path: config['rflow.pid_file_path'])
  @shards = config.shards.map {|config| Shard.new(config) }
  RFlow.logger.context_width = @shards.flat_map(&:workers).map(&:name).map(&:length).max
  @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) }
end

Instance Attribute Details

#brokersArray<Broker> (readonly)

The Brokers being managed by the RFlow::Master.



15
16
17
# File 'lib/rflow/master.rb', line 15

def brokers
  @brokers
end

#shardsArray<Shard> (readonly)

The Shards being managed by the RFlow::Master.



11
12
13
# File 'lib/rflow/master.rb', line 11

def shards
  @shards
end

Instance Method Details

#run_processvoid

This method returns an undefined value.

Override that starts EventMachine and waits until it gets stopped.



44
45
46
47
48
# File 'lib/rflow/master.rb', line 44

def run_process
  EM.run do
    # TODO: Monitor the workers
  end
end

#spawn_subprocessesvoid

This method returns an undefined value.

Override of #spawn_subprocesses that actually spawns them, then calls Shard#run! on each.



27
28
29
30
31
32
33
# File 'lib/rflow/master.rb', line 27

def spawn_subprocesses
  RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0
  brokers.each(&:spawn!)
  RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(', ')}" if brokers.count > 0

  shards.each(&:run!)
end

#subprocessesArray<ChildProcess>

Override of #subprocesses that includes the Brokers and every Shard::Worker of every Shard.



38
39
40
# File 'lib/rflow/master.rb', line 38

def subprocesses
  brokers + shards.flat_map(&:workers)
end