Class: MultiProcess::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/multi_process/group.rb

Overview

Store and run a group of processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Group

Create new process group.

Parameters:

  • opts (Hash) (defaults to: {})

    Options

  • otps (Hash)

    a customizable set of options



25
26
27
28
29
30
# File 'lib/multi_process/group.rb', line 25

def initialize(opts = {})
  @processes = []
  @receiver  = opts[:receiver] ? opts[:receiver] : MultiProcess::Logger.global
  @partition = opts[:partition] ? opts[:partition].to_i : 0
  @mutex     = Mutex.new
end

Instance Attribute Details

#partitionObject (readonly)

Partition size.



17
18
19
# File 'lib/multi_process/group.rb', line 17

def partition
  @partition
end

#processesObject (readonly)

Return list of processes.



8
9
10
# File 'lib/multi_process/group.rb', line 8

def processes
  @processes
end

#receiverObject

Receiver all processes in group should use.

If changed only affect new added processes.



14
15
16
# File 'lib/multi_process/group.rb', line 14

def receiver
  @receiver
end

Instance Method Details

#<<(process) ⇒ Object

Add new process or list of processes.

If group was already started added processes will also be started.

Parameters:



38
39
40
41
42
43
44
45
46
47
# File 'lib/multi_process/group.rb', line 38

def <<(process)
  Array(process).flatten.each do |process|
    processes << process
    process.receiver = receiver

    if started?
      start process
    end
  end
end

#alive?Boolean

Check if group is alive e.g. if at least on process is alive.

Returns:

  • (Boolean)

    True if group is alive.



126
127
128
# File 'lib/multi_process/group.rb', line 126

def alive?
  processes.any? &:alive?
end

#available!(opts = {}) ⇒ Object

Wait until group is available. This implies waiting until all processes in group are available.

Processes will not be stopped if timeout occurs.

Parameters:

  • opts (Hash) (defaults to: {})

    Options.

Options Hash (opts):

  • :timeout (Integer)

    Timeout in seconds to wait for processes to become available. Defaults to DEFAULT_TIMEOUT.



146
147
148
149
150
151
152
# File 'lib/multi_process/group.rb', line 146

def available!(opts = {})
  timeout = opts[:timeout] ? opts[:timeout].to_i : MultiProcess::DEFAULT_TIMEOUT

  Timeout.timeout timeout do
    processes.each{|p| p.available! }
  end
end

#available?Boolean

Check if group is available. The group is available if all processes are available.

Returns:

  • (Boolean)


133
134
135
# File 'lib/multi_process/group.rb', line 133

def available?
  !processes.any?{|p| !p.available? }
end

#run(opts = {}) ⇒ Object

Start all process and wait for them to terminate.

Given options will be passed to #start and #wait. #start will only be called if partition is zero.

If timeout is given process will be terminated using #stop when timeout error is raised.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/multi_process/group.rb', line 103

def run(opts = {})
  if partition > 0
    threads = Array.new
    partition.times do
      threads << Thread.new do
        while (process = next_process)
          process.run
        end
      end
    end
    threads.each &:join
  else
    start opts
    wait opts
  end
ensure
  stop
end

#start(opts = {}) ⇒ Object

Start all process in group.

Call blocks until all processes are started.

Parameters:

  • opts (Hash) (defaults to: {})

    Options.

Options Hash (opts):

  • :delay (Integer)

    Delay in seconds between starting processes.



56
57
58
59
60
61
62
63
# File 'lib/multi_process/group.rb', line 56

def start(opts = {})
  processes.each do |process|
    unless process.started?
      process.start
      sleep opts[:delay] if opts[:delay]
    end
  end
end

#started?Boolean

Check if group was already started.

Returns:

  • (Boolean)

    True if group was already started.



69
70
71
# File 'lib/multi_process/group.rb', line 69

def started?
  processes.any? &:started?
end

#stopObject

Stop all processes.



75
76
77
78
79
# File 'lib/multi_process/group.rb', line 75

def stop
  processes.each do |process|
    process.stop
  end
end

#wait(opts = {}) ⇒ Object

Wait until all process terminated.

Parameters:

  • opts (Hash) (defaults to: {})

    Options.

Options Hash (opts):

  • :timeout (Integer)

    Timeout in seconds to wait before raising Timeout::Error.



87
88
89
90
91
92
93
# File 'lib/multi_process/group.rb', line 87

def wait(opts = {})
  opts[:timeout] ||= 30

  ::Timeout::timeout(opts[:timeout]) do
    processes.each{|p| p.wait}
  end
end