Class: RFlow::Shard::Worker

Inherits:
ChildProcess show all
Defined in:
lib/rflow/shard.rb

Instance Attribute Summary collapse

Attributes inherited from ChildProcess

#name, #pid

Instance Method Summary collapse

Methods inherited from ChildProcess

#run_child_process, #spawn!

Constructor Details

#initialize(shard, index = 1) ⇒ Worker

Returns a new instance of Worker.



14
15
16
17
18
19
20
21
# File 'lib/rflow/shard.rb', line 14

def initialize(shard, index = 1)
  super("#{shard.name}-#{index}", 'Worker')
  @shard = shard
  @index = index

  # build at initialize time to fail fast
  @components = shard.config.components.map {|config| Component.build(self, config) }
end

Instance Attribute Details

#indexObject (readonly)

Returns the value of attribute index.



12
13
14
# File 'lib/rflow/shard.rb', line 12

def index
  @index
end

#shardObject (readonly)

Returns the value of attribute shard.



12
13
14
# File 'lib/rflow/shard.rb', line 12

def shard
  @shard
end

Instance Method Details

#configure_components!Object



41
42
43
44
45
46
47
# File 'lib/rflow/shard.rb', line 41

def configure_components!
  RFlow.logger.debug 'Configuring components'
  @components.zip(shard.config.components.map(&:options)).each do |(component, config)|
    RFlow.logger.debug "Configuring component '#{component.name}' (#{component.uuid})"
    component.configure! config
  end
end

#connect_components!Object

Connect all inputs before all outputs, so connection types that require a ‘server’ to be established before a ‘client’ can connect can get themselves ready.



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rflow/shard.rb', line 51

def connect_components!
  RFlow.logger.debug 'Connecting components'
  @components.each do |component|
    RFlow.logger.debug "Connecting inputs for component '#{component.name}' (#{component.uuid})"
    component.connect_inputs!
  end
  @components.each do |component|
    RFlow.logger.debug "Connecting outputs for component '#{component.name}' (#{component.uuid})"
    component.connect_outputs!
  end
end

#run_components!Object



63
64
65
66
67
68
69
# File 'lib/rflow/shard.rb', line 63

def run_components!
  RFlow.logger.debug 'Running components'
  @components.each do |component|
    RFlow.logger.debug "Running component '#{component.name}' (#{component.uuid})"
    component.run!
  end
end

#run_processObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rflow/shard.rb', line 23

def run_process
  EM.run do
    begin
      # TODO: Monitor the master
      configure_components!
      connect_components!
      # TODO: need to do proper node synchronization for ZMQ to remove sleep
      sleep 1
      run_components!
    rescue Exception => e
      RFlow.logger.error "Error in worker, shutting down: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}"
      exit! 1
    end
  end

  RFlow.logger.info 'Shutting down worker after EM stopped'
end

#shutdown!(signal) ⇒ Object



71
72
73
74
75
76
77
78
79
# File 'lib/rflow/shard.rb', line 71

def shutdown!(signal)
  RFlow.logger.debug 'Shutting down components'
  @components.each do |component|
    RFlow.logger.debug "Shutting down component '#{component.name}' (#{component.uuid})"
    component.shutdown!
  end
  EM.stop_event_loop
  super
end