Class: MultiProcess::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/multi_process/group.rb

Overview

Store and run a group of processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(receiver: nil, partition: nil) ⇒ Group

Create new process group.

Parameters:

  • opts (Hash)

    Options

  • otps (Hash)

    a customizable set of options



29
30
31
32
33
34
# File 'lib/multi_process/group.rb', line 29

def initialize(receiver: nil, partition: nil)
  @processes = []
  @receiver  = receiver || MultiProcess::Logger.global
  @partition = partition ? Integer(partition) : 0
  @mutex     = Mutex.new
end

Instance Attribute Details

#partitionObject (readonly)

Partition size.



21
22
23
# File 'lib/multi_process/group.rb', line 21

def partition
  @partition
end

#processesObject (readonly)

Return list of processes.



12
13
14
# File 'lib/multi_process/group.rb', line 12

def processes
  @processes
end

#receiverObject

Receiver all processes in group should use.

If changed only affect new added processes.



18
19
20
# File 'lib/multi_process/group.rb', line 18

def receiver
  @receiver
end

Instance Method Details

#<<(procs) ⇒ Object

Add new process or list of processes.

If group was already started added processes will also be started.

Parameters:



42
43
44
45
46
47
48
49
# File 'lib/multi_process/group.rb', line 42

def <<(procs)
  Array(procs).flatten.each do |process|
    processes << process
    process.receiver = receiver

    process.start if started?
  end
end

#alive?Boolean

Check if group is alive e.g. if at least on process is alive.

Returns:

  • (Boolean)

    True if group is alive.



153
154
155
# File 'lib/multi_process/group.rb', line 153

def alive?
  processes.any?(&:alive?)
end

#available!(timeout: MultiProcess::DEFAULT_TIMEOUT) ⇒ Object

Wait until group is available. This implies waiting until all processes in group are available.

Processes will not be stopped if timeout occurs.

Parameters:

  • opts (Hash)

    Options.



173
174
175
176
177
# File 'lib/multi_process/group.rb', line 173

def available!(timeout: MultiProcess::DEFAULT_TIMEOUT)
  Timeout.timeout timeout do
    processes.each(&:available!)
  end
end

#available?Boolean

Check if group is available. The group is available if all processes are available.

Returns:

  • (Boolean)


160
161
162
# File 'lib/multi_process/group.rb', line 160

def available?
  processes.all?(:available?)
end

#run(delay: nil, timeout: nil) ⇒ Object

Start all process and wait for them to terminate.

Given options will be passed to #start and #wait. #start will only be called if partition is zero.

If timeout is given process will be terminated using #stop when timeout error is raised.



117
118
119
120
121
122
123
124
125
126
# File 'lib/multi_process/group.rb', line 117

def run(delay: nil, timeout: nil)
  if partition.positive?
    run_partition(&:run)
  else
    start(delay: delay)
    wait(timeout: timeout)
  end
ensure
  stop
end

#run!(delay: nil, timeout: nil) ⇒ Object

Start all process and wait for them to terminate.

Given options will be passed to #start and #wait. #start will only be called if partition is zero.

If timeout is given process will be terminated using #stop when timeout error is raised.

An error will be raised if any process exits unsuccessfully.



138
139
140
141
142
143
144
145
146
147
# File 'lib/multi_process/group.rb', line 138

def run!(delay: nil, timeout: nil)
  if partition.positive?
    run_partition(&:run!)
  else
    start(delay: delay)
    wait!(timeout: timeout)
  end
ensure
  stop
end

#start(delay: nil) ⇒ Object

Start all process in group.

Call blocks until all processes are started.

Parameters:

  • delay (Hash) (defaults to: nil)

    a customizable set of options

Options Hash (delay:):

  • Delay (Integer)

    in seconds between starting processes.



57
58
59
60
61
62
63
64
# File 'lib/multi_process/group.rb', line 57

def start(delay: nil)
  processes.each do |process|
    next if process.started?

    process.start
    sleep delay if delay
  end
end

#started?Boolean

Check if group was already started.

Returns:

  • (Boolean)

    True if group was already started.



70
71
72
# File 'lib/multi_process/group.rb', line 70

def started?
  processes.any?(&:started?)
end

#stopObject

Stop all processes.



76
77
78
# File 'lib/multi_process/group.rb', line 76

def stop
  processes.each(&:stop)
end

#wait(timeout: nil) ⇒ Object

Wait until all process terminated.

Parameters:

  • opts (Hash)

    Options.



86
87
88
89
90
91
92
# File 'lib/multi_process/group.rb', line 86

def wait(timeout: nil)
  if timeout
    ::Timeout.timeout(timeout) { wait }
  else
    processes.each(&:wait)
  end
end

#wait!(timeout: nil) ⇒ Object

Wait until all process terminated.

Raise an error if a process exists unsuccessfully.

Parameters:

  • opts (Hash)

    Options.



101
102
103
104
105
106
107
# File 'lib/multi_process/group.rb', line 101

def wait!(timeout: nil)
  if timeout
    ::Timeout.timeout(timeout) { wait! }
  else
    processes.each(&:wait!)
  end
end