Class: RFlow::Master

Inherits:
DaemonProcess show all
Defined in:
lib/rflow/master.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from DaemonProcess

#daemonize!, #run!, #shutdown!

Constructor Details

#initialize(config) ⇒ Master

Returns a new instance of Master.



10
11
12
13
14
15
# File 'lib/rflow/master.rb', line 10

def initialize(config)
  super(config['rflow.application_name'], 'Master', pid_file_path: config['rflow.pid_file_path'])
  @shards = config.shards.map {|config| Shard.new(config) }
  RFlow.logger.context_width = @shards.flat_map(&:workers).map(&:name).map(&:length).max
  @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) }
end

Instance Attribute Details

#brokersObject (readonly)

Returns the value of attribute brokers.



8
9
10
# File 'lib/rflow/master.rb', line 8

def brokers
  @brokers
end

#shardsObject (readonly)

Returns the value of attribute shards.



7
8
9
# File 'lib/rflow/master.rb', line 7

def shards
  @shards
end

Instance Method Details

#run_processObject



29
30
31
32
33
# File 'lib/rflow/master.rb', line 29

def run_process
  EM.run do
    # TODO: Monitor the workers
  end
end

#spawn_subprocessesObject



17
18
19
20
21
22
23
# File 'lib/rflow/master.rb', line 17

def spawn_subprocesses
  RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0
  brokers.each(&:spawn!)
  RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(', ')}" if brokers.count > 0

  shards.each(&:run!)
end

#subprocessesObject



25
26
27
# File 'lib/rflow/master.rb', line 25

def subprocesses
  brokers + shards.flat_map(&:workers)
end