Class: RFlow::Shard::Worker

Inherits:
ChildProcess show all
Defined in:
lib/rflow/shard.rb

Overview

An actual child process under the RFlow::Shard, which coordinates a set of identical Workers.

Constant Summary

Constants inherited from ChildProcess

ChildProcess::SIGINFO

Instance Attribute Summary collapse

Attributes inherited from ChildProcess

#name, #pid

Instance Method Summary collapse

Methods inherited from ChildProcess

#spawn!

Constructor Details

#initialize(shard, index = 1) ⇒ Worker

Returns a new instance of Worker.



23
24
25
26
27
28
29
30
# File 'lib/rflow/shard.rb', line 23

def initialize(shard, index = 1)
  super("#{shard.name}-#{index}", 'Worker')
  @shard = shard
  @index = index

  # build at initialize time to fail fast
  @components = shard.config.components.map {|config| Component.build(self, config) }
end

Instance Attribute Details

#indexInteger (readonly)

Which worker index this is (for example, in a set of 3 RFlow::Shard::Workers, one would have index 0, one would have index 1, one would have index 2).

Returns:

  • (Integer)


21
22
23
# File 'lib/rflow/shard.rb', line 21

def index
  @index
end

#shardShard (readonly)

A reference to the RFlow::Shard governing this RFlow::Shard::Worker.

Returns:



16
17
18
# File 'lib/rflow/shard.rb', line 16

def shard
  @shard
end

Instance Method Details

#run_processvoid

This method returns an undefined value.

Configure, connect, and actually start running RFlow components.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/rflow/shard.rb', line 34

def run_process
  EM.run do
    begin
      # TODO: Monitor the master
      configure_components!
      connect_components!
      # TODO: need to do proper node synchronization for ZMQ to remove sleep
      sleep 1
      run_components!
    rescue Exception => e
      RFlow.logger.error "Error in worker, shutting down: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}"
      exit! 1
    end
  end

  RFlow.logger.info 'Shutting down worker after EM stopped'
end

#shutdown!(signal) ⇒ void

This method returns an undefined value.

Shut down the RFlow::Shard::Worker. Shuts down each component and kills EventMachine.



86
87
88
89
90
91
92
93
94
# File 'lib/rflow/shard.rb', line 86

def shutdown!(signal)
  RFlow.logger.debug 'Shutting down components'
  @components.each do |component|
    RFlow.logger.debug "Shutting down component '#{component.name}' (#{component.uuid})"
    component.shutdown!
  end
  EM.stop_event_loop
  super
end