Class: Sisyphus::Master
- Inherits:
-
Object
- Object
- Sisyphus::Master
- Defined in:
- lib/sisyphus/master.rb
Constant Summary collapse
- IO_TIMEOUT =
10- HANDLED_SIGNALS =
[:INT, :TTIN, :TTOU]
Instance Method Summary collapse
-
#initialize(job, options = {}) ⇒ Master
constructor
A new instance of Master.
- #start ⇒ Object
- #start_worker ⇒ Object
- #stop_all ⇒ Object
- #stop_worker(wpid) ⇒ Object
- #worker_count ⇒ Object
Constructor Details
#initialize(job, options = {}) ⇒ Master
Returns a new instance of Master.
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/sisyphus/master.rb', line 10 def initialize(job, = {}) @number_of_workers = .fetch :workers, 0 @workers = [] @job = job self_reader, self_writer = IO.pipe @selfpipe = { reader: self_reader, writer: self_writer } Thread.main[:signal_queue] = [] end |
Instance Method Details
#start ⇒ Object
21 22 23 24 25 26 27 28 29 |
# File 'lib/sisyphus/master.rb', line 21 def start trap_signals @number_of_workers.times do start_worker sleep rand(1000).fdiv(1000) end puts "Sisyphus::Master started with PID: #{Process.pid}" watch_for_output end |
#start_worker ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/sisyphus/master.rb', line 31 def start_worker reader, writer = IO.pipe if wpid = fork writer.close @workers << { pid: wpid, reader: reader } else reader.close worker = Worker.new(@job, writer) self.process_name = "Worker #{Process.pid}" worker.start end end |
#stop_all ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/sisyphus/master.rb', line 52 def stop_all @workers.each do |worker| stop_worker worker.fetch(:pid) end Timeout.timeout(30) do watch_for_shutdown while worker_count > 0 end end |
#stop_worker(wpid) ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/sisyphus/master.rb', line 44 def stop_worker(wpid) if @workers.find { |w| w.fetch(:pid) == wpid } Process.kill 'INT', wpid else raise "Unknown worker PID: #{wpid}" end end |
#worker_count ⇒ Object
61 62 63 |
# File 'lib/sisyphus/master.rb', line 61 def worker_count @workers.length end |