Class: MultiProcess::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/multi_process/group.rb

Overview

Store and run a group of processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(receiver: nil, partition: nil) ⇒ Group

Create new process group.

Parameters:

  • opts (Hash)

    Options

  • otps (Hash)

    a customizable set of options



25
26
27
28
29
30
# File 'lib/multi_process/group.rb', line 25

def initialize(receiver: nil, partition: nil)
  @processes = []
  @receiver  = receiver ? receiver : MultiProcess::Logger.global
  @partition = partition ? partition.to_i : 0
  @mutex     = Mutex.new
end

Instance Attribute Details

#partitionObject (readonly)

Partition size.



17
18
19
# File 'lib/multi_process/group.rb', line 17

def partition
  @partition
end

#processesObject (readonly)

Return list of processes.



8
9
10
# File 'lib/multi_process/group.rb', line 8

def processes
  @processes
end

#receiverObject

Receiver all processes in group should use.

If changed only affect new added processes.



14
15
16
# File 'lib/multi_process/group.rb', line 14

def receiver
  @receiver
end

Instance Method Details

#<<(procs) ⇒ Object

Add new process or list of processes.

If group was already started added processes will also be started.

Parameters:



38
39
40
41
42
43
44
45
# File 'lib/multi_process/group.rb', line 38

def <<(procs)
  Array(procs).flatten.each do |process|
    processes << process
    process.receiver = receiver

    start process if started?
  end
end

#alive?Boolean

Check if group is alive e.g. if at least on process is alive.

Returns:

  • (Boolean)

    True if group is alive.



119
120
121
# File 'lib/multi_process/group.rb', line 119

def alive?
  processes.any?(&:alive?)
end

#available!(timeout: MultiProcess::DEFAULT_TIMEOUT) ⇒ Object

Wait until group is available. This implies waiting until all processes in group are available.

Processes will not be stopped if timeout occurs.

Parameters:

  • opts (Hash)

    Options.



139
140
141
142
143
# File 'lib/multi_process/group.rb', line 139

def available!(timeout: MultiProcess::DEFAULT_TIMEOUT)
  Timeout.timeout timeout do
    processes.each(&:available!)
  end
end

#available?Boolean

Check if group is available. The group is available if all processes are available.

Returns:

  • (Boolean)


126
127
128
# File 'lib/multi_process/group.rb', line 126

def available?
  processes.all?(:available?)
end

#run(delay: nil, timeout: nil) ⇒ Object

Start all process and wait for them to terminate.

Given options will be passed to #start and #wait. #start will only be called if partition is zero.

If timeout is given process will be terminated using #stop when timeout error is raised.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/multi_process/group.rb', line 98

def run(delay: nil, timeout: nil)
  if partition > 0
    partition.times.map do
      Thread.new do
        while (process = next_process)
          process.run
        end
      end
    end.each(&:join)
  else
    start delay: delay
    wait timeout: timeout
  end
ensure
  stop
end

#start(delay: nil) ⇒ Object

Start all process in group.

Call blocks until all processes are started.

Parameters:

  • delay (Hash) (defaults to: nil)

    a customizable set of options

Options Hash (delay:):

  • Delay (Integer)

    in seconds between starting processes.



53
54
55
56
57
58
59
60
# File 'lib/multi_process/group.rb', line 53

def start(delay: nil)
  processes.each do |process|
    next if process.started?

    process.start
    sleep delay if delay
  end
end

#started?Boolean

Check if group was already started.

Returns:

  • (Boolean)

    True if group was already started.



66
67
68
# File 'lib/multi_process/group.rb', line 66

def started?
  processes.any?(&:started?)
end

#stopObject

Stop all processes.



72
73
74
# File 'lib/multi_process/group.rb', line 72

def stop
  processes.each(&:stop)
end

#wait(timeout: nil) ⇒ Object

Wait until all process terminated.

Parameters:

  • opts (Hash)

    Options.



82
83
84
85
86
87
88
# File 'lib/multi_process/group.rb', line 82

def wait(timeout: nil)
  if timeout
    ::Timeout.timeout(timeout) { wait }
  else
    processes.each(&:wait)
  end
end