Class: Sisyphus::Master
- Inherits:
-
Object
- Object
- Sisyphus::Master
- Defined in:
- lib/sisyphus/master.rb
Constant Summary collapse
- IO_TIMEOUT =
10- HANDLED_SIGNALS =
[:INT, :TTIN, :TTOU]
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
-
#initialize(job, options = {}) ⇒ Master
constructor
A new instance of Master.
- #start ⇒ Object
- #start_worker ⇒ Object
- #stop_all ⇒ Object
- #stop_worker(wpid) ⇒ Object
- #worker_count ⇒ Object
Constructor Details
#initialize(job, options = {}) ⇒ Master
Returns a new instance of Master.
13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/sisyphus/master.rb', line 13 def initialize(job, = {}) @number_of_workers = .fetch :workers, 0 @logger = .fetch :logger, NullLogger.new @workers = [] @job = job self_reader, self_writer = IO.pipe @selfpipe = { reader: self_reader, writer: self_writer } Thread.main[:signal_queue] = [] end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
11 12 13 |
# File 'lib/sisyphus/master.rb', line 11 def logger @logger end |
Instance Method Details
#start ⇒ Object
25 26 27 28 29 30 31 32 33 |
# File 'lib/sisyphus/master.rb', line 25 def start trap_signals @number_of_workers.times do start_worker sleep rand(1000).fdiv(1000) end puts "Sisyphus::Master started with PID: #{Process.pid}" watch_for_output end |
#start_worker ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/sisyphus/master.rb', line 35 def start_worker reader, writer = IO.pipe if wpid = fork writer.close @workers << { pid: wpid, reader: reader } else reader.close self.process_name = "Worker #{Process.pid}" begin worker = Worker.new(@job, writer, logger) worker.setup worker.start rescue Exception => e writer.write Worker::UNCAUGHT_ERROR logger.warn(process_name) { e } exit! 0 end end end |
#stop_all ⇒ Object
61 62 63 64 65 66 67 68 |
# File 'lib/sisyphus/master.rb', line 61 def stop_all @workers.each do |worker| stop_worker worker.fetch(:pid) end Timeout.timeout(30) do watch_for_shutdown while worker_count > 0 end end |
#stop_worker(wpid) ⇒ Object
55 56 57 58 59 |
# File 'lib/sisyphus/master.rb', line 55 def stop_worker(wpid) if @workers.find { |w| w.fetch(:pid) == wpid } Process.kill 'INT', wpid rescue Errno::ESRCH # Ignore if the process is already gone end end |
#worker_count ⇒ Object
70 71 72 |
# File 'lib/sisyphus/master.rb', line 70 def worker_count @workers.length end |