Class: Process::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/process/group.rb,
lib/process/group/version.rb

Overview

A group of tasks which can be run asynchrnously using fibers. Someone must call Group#wait to ensure that all fibers eventually resume.

Defined Under Namespace

Classes: Command, Fork

Constant Summary collapse

VERSION =
"0.1.3"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Group

Create a new process group. Can specify ‘options` which limits the maximum number of concurrent processes.



76
77
78
79
80
81
82
83
84
# File 'lib/process/group.rb', line 76

def initialize(options = {})
  @queue = []
  @limit = options[:limit]

  @running = {}
  @fiber = nil

  @pgid = nil
end

Instance Attribute Details

#limitObject

The maximum number of processes to run concurrently, or zero



90
91
92
# File 'lib/process/group.rb', line 90

def limit
  @limit
end

#runningObject (readonly)

A table of currently running processes.



87
88
89
# File 'lib/process/group.rb', line 87

def running
  @running
end

Instance Method Details

#available?Boolean

Whether not not calling run would be scheduled immediately.

Returns:

  • (Boolean)


124
125
126
127
128
129
130
# File 'lib/process/group.rb', line 124

def available?
  if @limit
    @running.size < @limit
  else
    true
  end
end

#blocking?Boolean

Whether or not calling run would block the caller.

Returns:

  • (Boolean)


133
134
135
# File 'lib/process/group.rb', line 133

def blocking?
  not available?
end

#fork(options = {}, &block) ⇒ Object



119
120
121
# File 'lib/process/group.rb', line 119

def fork(options = {}, &block)
  append! Fork.new(block, options)
end

#idObject

The id of the process group, only valid if processes are currently running.

Raises:

  • (RuntimeError)


93
94
95
96
97
# File 'lib/process/group.rb', line 93

def id
  raise RuntimeError.new("No processes in group, no group id available.") if @running.size == 0
  
  -@pgid
end

#kill(signal) ⇒ Object

Send a signal to all processes.



166
167
168
169
170
# File 'lib/process/group.rb', line 166

def kill(signal)
  if running?
    Process.kill(signal, id)
  end
end

#run(*arguments) ⇒ Object

Run a process, arguments have same meaning as Process#spawn.



104
105
106
107
108
109
110
# File 'lib/process/group.rb', line 104

def run(*arguments)
  Fiber.new do
    exit_status = self.spawn(*arguments)
    
    yield exit_status if block_given?
  end.resume
end

#running?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/process/group.rb', line 99

def running?
  @running.size > 0
end

#spawn(*arguments) ⇒ Object



112
113
114
115
116
117
# File 'lib/process/group.rb', line 112

def spawn(*arguments)
  # Could be nice to use ** splat, but excludes ruby < 2.0.
  options = Hash === arguments.last ? arguments.pop : {}
  
  append! Command.new(arguments, options)
end

#waitObject

Wait for all processes to finish, naturally would schedule any fibers which are currently blocked.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/process/group.rb', line 138

def wait
  while running?
    process, status = wait_one
    
    schedule!
    
    process.resume(status)
  end
  
  # No processes, process group is no longer valid:
  @pgid = nil
rescue Interrupt
  # If the user interrupts the wait, interrupt the process group and wait for them to finish:
  self.kill(:INT)
  
  # If user presses Ctrl-C again (or something else goes wrong), we will come out and kill(:TERM) in the ensure below:
  wait_all
  
  raise
ensure
  # You'd only get here with running processes if some unexpected error was thrown:
  self.kill(:TERM)
  
  # Clean up zombie processes - if user presses Ctrl-C or for some reason something else blows up, exception would propagate back to caller:
  wait_all
end