Class: MultiProcess::Group
- Inherits:
-
Object
- Object
- MultiProcess::Group
- Defined in:
- lib/multi_process/group.rb
Overview
Store and run a group of processes.
Instance Attribute Summary collapse
-
#partition ⇒ Object
readonly
Partition size.
-
#processes ⇒ Object
readonly
Return list of processes.
-
#receiver ⇒ Object
Receiver all processes in group should use.
Instance Method Summary collapse
-
#<<(procs) ⇒ Object
Add new process or list of processes.
-
#alive? ⇒ Boolean
Check if group is alive e.g.
-
#available!(timeout: MultiProcess::DEFAULT_TIMEOUT) ⇒ Object
Wait until group is available.
-
#available? ⇒ Boolean
Check if group is available.
-
#initialize(receiver: nil, partition: nil) ⇒ Group
constructor
Create new process group.
-
#run(delay: nil, timeout: nil) ⇒ Object
Start all process and wait for them to terminate.
-
#run!(delay: nil, timeout: nil) ⇒ Object
Start all process and wait for them to terminate.
-
#start(delay: nil) ⇒ Object
Start all process in group.
-
#started? ⇒ Boolean
Check if group was already started.
-
#stop ⇒ Object
Stop all processes.
-
#wait(timeout: nil) ⇒ Object
Wait until all process terminated.
-
#wait!(timeout: nil) ⇒ Object
Wait until all process terminated.
Constructor Details
#initialize(receiver: nil, partition: nil) ⇒ Group
Create new process group.
29 30 31 32 33 34 |
# File 'lib/multi_process/group.rb', line 29 def initialize(receiver: nil, partition: nil) @processes = [] @receiver = receiver || MultiProcess::Logger.global @partition = partition ? Integer(partition) : 0 @mutex = Mutex.new end |
Instance Attribute Details
#partition ⇒ Object (readonly)
Partition size.
21 22 23 |
# File 'lib/multi_process/group.rb', line 21 def partition @partition end |
#processes ⇒ Object (readonly)
Return list of processes.
12 13 14 |
# File 'lib/multi_process/group.rb', line 12 def processes @processes end |
#receiver ⇒ Object
Receiver all processes in group should use.
If changed only affect new added processes.
18 19 20 |
# File 'lib/multi_process/group.rb', line 18 def receiver @receiver end |
Instance Method Details
#<<(procs) ⇒ Object
Add new process or list of processes.
If group was already started added processes will also be started.
42 43 44 45 46 47 48 49 |
# File 'lib/multi_process/group.rb', line 42 def <<(procs) Array(procs).flatten.each do |process| processes << process process.receiver = receiver process.start if started? end end |
#alive? ⇒ Boolean
Check if group is alive e.g. if at least on process is alive.
153 154 155 |
# File 'lib/multi_process/group.rb', line 153 def alive? processes.any?(&:alive?) end |
#available!(timeout: MultiProcess::DEFAULT_TIMEOUT) ⇒ Object
Wait until group is available. This implies waiting until all processes in group are available.
Processes will not be stopped if timeout occurs.
173 174 175 176 177 |
# File 'lib/multi_process/group.rb', line 173 def available!(timeout: MultiProcess::DEFAULT_TIMEOUT) Timeout.timeout timeout do processes.each(&:available!) end end |
#available? ⇒ Boolean
Check if group is available. The group is available if all processes are available.
160 161 162 |
# File 'lib/multi_process/group.rb', line 160 def available? processes.all?(:available?) end |
#run(delay: nil, timeout: nil) ⇒ Object
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/multi_process/group.rb', line 117 def run(delay: nil, timeout: nil) if partition.positive? run_partition(&:run) else start(delay: delay) wait(timeout: timeout) end ensure stop end |
#run!(delay: nil, timeout: nil) ⇒ Object
138 139 140 141 142 143 144 145 146 147 |
# File 'lib/multi_process/group.rb', line 138 def run!(delay: nil, timeout: nil) if partition.positive? run_partition(&:run!) else start(delay: delay) wait!(timeout: timeout) end ensure stop end |
#start(delay: nil) ⇒ Object
Start all process in group.
Call blocks until all processes are started.
57 58 59 60 61 62 63 64 |
# File 'lib/multi_process/group.rb', line 57 def start(delay: nil) processes.each do |process| next if process.started? process.start sleep delay if delay end end |
#started? ⇒ Boolean
Check if group was already started.
70 71 72 |
# File 'lib/multi_process/group.rb', line 70 def started? processes.any?(&:started?) end |
#stop ⇒ Object
Stop all processes.
76 77 78 |
# File 'lib/multi_process/group.rb', line 76 def stop processes.each(&:stop) end |
#wait(timeout: nil) ⇒ Object
Wait until all process terminated.
86 87 88 89 90 91 92 |
# File 'lib/multi_process/group.rb', line 86 def wait(timeout: nil) if timeout ::Timeout.timeout(timeout) { wait } else processes.each(&:wait) end end |
#wait!(timeout: nil) ⇒ Object
Wait until all process terminated.
Raise an error if a process exists unsuccessfully.
101 102 103 104 105 106 107 |
# File 'lib/multi_process/group.rb', line 101 def wait!(timeout: nil) if timeout ::Timeout.timeout(timeout) { wait! } else processes.each(&:wait!) end end |