Class: MultiProcess::Group
- Inherits:
-
Object
- Object
- MultiProcess::Group
- Defined in:
- lib/multi_process/group.rb
Overview
Store and run a group of processes.
Instance Attribute Summary collapse
-
#partition ⇒ Object
readonly
Partition size.
-
#processes ⇒ Object
readonly
Return list of processes.
-
#receiver ⇒ Object
Receiver all processes in group should use.
Instance Method Summary collapse
-
#<<(procs) ⇒ Object
Add new process or list of processes.
-
#alive? ⇒ Boolean
Check if group is alive e.g.
-
#available!(timeout: MultiProcess::DEFAULT_TIMEOUT) ⇒ Object
Wait until group is available.
-
#available? ⇒ Boolean
Check if group is available.
-
#initialize(receiver: nil, partition: nil) ⇒ Group
constructor
Create new process group.
-
#run(delay: nil, timeout: nil) ⇒ Object
Start all process and wait for them to terminate.
-
#start(delay: nil) ⇒ Object
Start all process in group.
-
#started? ⇒ Boolean
Check if group was already started.
-
#stop ⇒ Object
Stop all processes.
-
#wait(timeout: nil) ⇒ Object
Wait until all process terminated.
Constructor Details
#initialize(receiver: nil, partition: nil) ⇒ Group
Create new process group.
25 26 27 28 29 30 |
# File 'lib/multi_process/group.rb', line 25 def initialize(receiver: nil, partition: nil) @processes = [] @receiver = receiver ? receiver : MultiProcess::Logger.global @partition = partition ? partition.to_i : 0 @mutex = Mutex.new end |
Instance Attribute Details
#partition ⇒ Object (readonly)
Partition size.
17 18 19 |
# File 'lib/multi_process/group.rb', line 17 def partition @partition end |
#processes ⇒ Object (readonly)
Return list of processes.
8 9 10 |
# File 'lib/multi_process/group.rb', line 8 def processes @processes end |
#receiver ⇒ Object
Receiver all processes in group should use.
If changed only affect new added processes.
14 15 16 |
# File 'lib/multi_process/group.rb', line 14 def receiver @receiver end |
Instance Method Details
#<<(procs) ⇒ Object
Add new process or list of processes.
If group was already started added processes will also be started.
38 39 40 41 42 43 44 45 |
# File 'lib/multi_process/group.rb', line 38 def <<(procs) Array(procs).flatten.each do |process| processes << process process.receiver = receiver start process if started? end end |
#alive? ⇒ Boolean
Check if group is alive e.g. if at least on process is alive.
119 120 121 |
# File 'lib/multi_process/group.rb', line 119 def alive? processes.any?(&:alive?) end |
#available!(timeout: MultiProcess::DEFAULT_TIMEOUT) ⇒ Object
Wait until group is available. This implies waiting until all processes in group are available.
Processes will not be stopped if timeout occurs.
139 140 141 142 143 |
# File 'lib/multi_process/group.rb', line 139 def available!(timeout: MultiProcess::DEFAULT_TIMEOUT) Timeout.timeout timeout do processes.each(&:available!) end end |
#available? ⇒ Boolean
Check if group is available. The group is available if all processes are available.
126 127 128 |
# File 'lib/multi_process/group.rb', line 126 def available? processes.all?(:available?) end |
#run(delay: nil, timeout: nil) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/multi_process/group.rb', line 98 def run(delay: nil, timeout: nil) if partition > 0 partition.times.map do Thread.new do while (process = next_process) process.run end end end.each(&:join) else start delay: delay wait timeout: timeout end ensure stop end |
#start(delay: nil) ⇒ Object
Start all process in group.
Call blocks until all processes are started.
53 54 55 56 57 58 59 60 |
# File 'lib/multi_process/group.rb', line 53 def start(delay: nil) processes.each do |process| next if process.started? process.start sleep delay if delay end end |
#started? ⇒ Boolean
Check if group was already started.
66 67 68 |
# File 'lib/multi_process/group.rb', line 66 def started? processes.any?(&:started?) end |
#stop ⇒ Object
Stop all processes.
72 73 74 |
# File 'lib/multi_process/group.rb', line 72 def stop processes.each(&:stop) end |
#wait(timeout: nil) ⇒ Object
Wait until all process terminated.
82 83 84 85 86 87 88 |
# File 'lib/multi_process/group.rb', line 82 def wait(timeout: nil) if timeout ::Timeout.timeout(timeout) { wait } else processes.each(&:wait) end end |