Class: MultiProcess::Group
- Inherits:
-
Object
- Object
- MultiProcess::Group
- Defined in:
- lib/multi_process/group.rb
Overview
Store and run a group of processes.
Instance Attribute Summary collapse
-
#partition ⇒ Object
readonly
Partition size.
-
#processes ⇒ Object
readonly
Return list of processes.
-
#receiver ⇒ Object
Receiver all processes in group should use.
Instance Method Summary collapse
-
#<<(process) ⇒ Object
Add new process or list of processes.
-
#alive? ⇒ Boolean
Check if group is alive e.g.
-
#available!(opts = {}) ⇒ Object
Wait until group is available.
-
#available? ⇒ Boolean
Check if group is available.
-
#initialize(opts = {}) ⇒ Group
constructor
Create new process group.
-
#run(opts = {}) ⇒ Object
Start all process and wait for them to terminate.
-
#start(opts = {}) ⇒ Object
Start all process in group.
-
#started? ⇒ Boolean
Check if group was already started.
-
#stop ⇒ Object
Stop all processes.
-
#wait(opts = {}) ⇒ Object
Wait until all process terminated.
Constructor Details
#initialize(opts = {}) ⇒ Group
Create new process group.
25 26 27 28 29 30 |
# File 'lib/multi_process/group.rb', line 25 def initialize(opts = {}) @processes = [] @receiver = opts[:receiver] ? opts[:receiver] : MultiProcess::Logger.global @partition = opts[:partition] ? opts[:partition].to_i : 0 @mutex = Mutex.new end |
Instance Attribute Details
#partition ⇒ Object (readonly)
Partition size.
17 18 19 |
# File 'lib/multi_process/group.rb', line 17 def partition @partition end |
#processes ⇒ Object (readonly)
Return list of processes.
8 9 10 |
# File 'lib/multi_process/group.rb', line 8 def processes @processes end |
#receiver ⇒ Object
Receiver all processes in group should use.
If changed only affect new added processes.
14 15 16 |
# File 'lib/multi_process/group.rb', line 14 def receiver @receiver end |
Instance Method Details
#<<(process) ⇒ Object
Add new process or list of processes.
If group was already started added processes will also be started.
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/multi_process/group.rb', line 38 def <<(process) Array(process).flatten.each do |process| processes << process process.receiver = receiver if started? start process end end end |
#alive? ⇒ Boolean
Check if group is alive e.g. if at least on process is alive.
126 127 128 |
# File 'lib/multi_process/group.rb', line 126 def alive? processes.any? &:alive? end |
#available!(opts = {}) ⇒ Object
Wait until group is available. This implies waiting until all processes in group are available.
Processes will not be stopped if timeout occurs.
146 147 148 149 150 151 152 |
# File 'lib/multi_process/group.rb', line 146 def available!(opts = {}) timeout = opts[:timeout] ? opts[:timeout].to_i : MultiProcess::DEFAULT_TIMEOUT Timeout.timeout timeout do processes.each{|p| p.available! } end end |
#available? ⇒ Boolean
Check if group is available. The group is available if all processes are available.
133 134 135 |
# File 'lib/multi_process/group.rb', line 133 def available? !processes.any?{|p| !p.available? } end |
#run(opts = {}) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/multi_process/group.rb', line 103 def run(opts = {}) if partition > 0 threads = Array.new partition.times do threads << Thread.new do while (process = next_process) process.run end end end threads.each &:join else start opts wait opts end ensure stop end |
#start(opts = {}) ⇒ Object
Start all process in group.
Call blocks until all processes are started.
56 57 58 59 60 61 62 63 |
# File 'lib/multi_process/group.rb', line 56 def start(opts = {}) processes.each do |process| unless process.started? process.start sleep opts[:delay] if opts[:delay] end end end |
#started? ⇒ Boolean
Check if group was already started.
69 70 71 |
# File 'lib/multi_process/group.rb', line 69 def started? processes.any? &:started? end |
#stop ⇒ Object
Stop all processes.
75 76 77 78 79 |
# File 'lib/multi_process/group.rb', line 75 def stop processes.each do |process| process.stop end end |
#wait(opts = {}) ⇒ Object
Wait until all process terminated.
87 88 89 90 91 92 93 |
# File 'lib/multi_process/group.rb', line 87 def wait(opts = {}) opts[:timeout] ||= 30 ::Timeout::timeout(opts[:timeout]) do processes.each{|p| p.wait} end end |