Class: InParallel

Inherits:
Object
  • Object
show all
Defined in:
lib/in_parallel.rb

Defined Under Namespace

Classes: BlankBindingParallelProxy

Constant Summary collapse

@@supported =
Process.respond_to?(:fork)
@@outs =
[]
@@background_objs =
[]
@@result_id =
0

Class Method Summary collapse

Class Method Details

._execute_in_parallel(method_sym, obj = self, &block) ⇒ Object

private method to execute some code in a separate process and store the STDOUT and STDERR for later retrieval



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/in_parallel.rb', line 122

def self._execute_in_parallel(method_sym, obj = self, &block)
  ret_val = nil
  # Communicate the return value of the method or block
  read_result, write_result = IO.pipe
  # Store the STDOUT and STDERR of the method or block
  read_io, write_io = IO.pipe
  pid = fork do
    STDOUT.reopen(write_io)
    STDERR.reopen(write_io)
    # Need to store this for the case of run_in_background in _execute
    @@result_writer = write_result
    begin
      # close subprocess's copy of read_io since it only needs to write
      read_io.close
      read_result.close
      ret_val = obj.instance_eval(&block)
      # Write the result to the write_result IO stream.
      # Have to serialize the value so it can be transmitted via IO
      Marshal.dump(ret_val, write_result)
    rescue SystemCallError => err
      puts "error: #{err.message}"
      write_io.write('.')
      exit 1
    ensure
      write_io.close
      write_result.close
    end
  end
  write_io.close
  write_result.close
  # store the IO object with the STDOUT for each pid
  out = { :pid => pid,
          :method_sym => method_sym,
          :std_out => read_io,
          :result => read_result,
          :tmp_result => "unresolved_parallel_result_#{@@result_id}" }
  @@outs.push(out)
  @@result_id += 1
  out
end

.get_background_resultsObject



81
82
83
84
85
86
87
88
89
# File 'lib/in_parallel.rb', line 81

def self.get_background_results
  results_map = wait_for_processes
  # pass in the 'self' from the block.binding which is the instance of the class
  # that contains the initial binding call.
  # This gives us access to the local and instance variables from that context.
  @@background_objs.each {|obj|
    return result_lookup(obj[:proxy], obj[:target], results_map)
  }
end

.outsObject



6
7
8
# File 'lib/in_parallel.rb', line 6

def self.outs
  @@outs
end

.run_in_background(ignore_result = true, &block) ⇒ Object

Example - Will spawn a process in the background to run puppet agent on two agents and return immediately: Parallel.run_in_background

@result = on agents[0], 'puppet agent -t'
@result_2 = on agents[1], 'puppet agent -t'

# Do something else here before waiting for the process to complete

# Optionally wait for the processes to complete before continuing. # Otherwise use run_in_background(true) to clean up the process status and output immediately. Parrallel.get_background_results(self) NOTE: must call get_background_results to allow instance variables in calling object to be set, otherwise @result will evaluate to “unresolved_parallel_result_0”



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/in_parallel.rb', line 61

def self.run_in_background(ignore_result = true, &block)
  if @@supported
    proxy = BlankBindingParallelProxy.new(self)
    proxy.instance_eval(&block)

    if ignore_result
      Process.detach(@@outs.last[:pid])
      @@outs.pop
    else
      @@background_objs << {:proxy => proxy, :target => eval("self", block.binding)}
      return outs.last[:tmp_result]
    end
    return
  end
  puts 'Warning: Fork is not supported on this OS, executing block normally'
  result = block.call
  return nil if ignore_result
  result
end

.run_in_parallel(&block) ⇒ Object

Example - will spawn 2 processes, (1 for each method) wait until they both complete, and log STDOUT: InParallel.run_in_parallel

@result_1 = on agents[0], 'puppet agent -t'
@result_2 = on agents[1], 'puppet agent -t'

NOTE: Only supports assigning instance variables within the block, not local variables



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/in_parallel.rb', line 19

def self.run_in_parallel(&block)
  if @@supported
    proxy = BlankBindingParallelProxy.new(self)
    proxy.instance_eval(&block)
    results_map = wait_for_processes
    # pass in the 'self' from the block.binding which is the instance of the class
    # that contains the initial binding call.
    # This gives us access to the local and instance variables from that context.
    return result_lookup(proxy, eval("self", block.binding), results_map)
  end
  puts 'Warning: Fork is not supported on this OS, executing block normally'
  block.call
end

.wait_for_processesObject

Waits for all processes to complete and logs STDOUT and STDERR in chunks from any processes that were triggered from this Parallel class



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/in_parallel.rb', line 93

def self.wait_for_processes
  return unless @@supported
  # Wait for all processes to complete
  statuses = Process.waitall

  results_map = {}
  # Print the STDOUT and STDERR for each process with signals for start and end
  while !@@outs.empty? do
    out = @@outs.shift
    begin
      puts "\n------ Begin output for #{out[:method_sym]} - #{out[:pid]}\n"
      puts out[:std_out].readlines
      puts "------ Completed output for #{out[:method_sym]} - #{out[:pid]}\n"
      results_map[out[:tmp_result]] = Marshal.load(out[:result].read)
    ensure
      # close the read end pipes
      out[:std_out].close unless out[:std_out].closed?
      out[:result].close unless out[:result].closed?
    end
  end

  statuses.each { |status|
    raise("Parallel process with PID '#{status[0]}' failed: #{status[1]}") unless status[1].success?
  }

  return results_map
end