Module: PPipe::Methods

Includes:
Log, Reader
Included in:
PPipe, Controller
Defined in:
lib/parallelpipes.rb

Overview

Quick Links: PPipe, parallelpipes.rb, PPipe::Methods, PPipe::Controller, PPipe::Message

The primary purpose of this module is to provide the methods for the class PPipe. Therefore all the method documentation specifies having a PPipe as the receiver. However, it is perfectly possible to use this module as a module in its own right; see the section Modules in parallelpipes.rb

Instance Attribute Summary collapse

Attributes included from Log

#verbosity

Instance Method Summary collapse

Methods included from Log

clean_up, io=, #log, log_file, log_file=

Methods included from Reader

#configure_reader, #get_line, #read_pipe

Instance Attribute Details

#is_rootObject (readonly)

is the ppipe the root ppipe (the first one to be created)?



1030
1031
1032
# File 'lib/parallelpipes.rb', line 1030

def is_root
  @is_root
end

#mpnObject (readonly)

my pipe number - the pipe number of the PPipe



1026
1027
1028
# File 'lib/parallelpipes.rb', line 1026

def mpn
  @mpn
end

#redirectObject (readonly)

see set_up_pipes



1033
1034
1035
# File 'lib/parallelpipes.rb', line 1033

def redirect
  @redirect
end

#thread_safeObject (readonly)

see set_up_pipes



1033
1034
1035
# File 'lib/parallelpipes.rb', line 1033

def thread_safe
  @thread_safe
end

#tp_requiredObject (readonly)

see set_up_pipes



1033
1034
1035
# File 'lib/parallelpipes.rb', line 1033

def tp_required
  @tp_required
end

#tt_requiredObject (readonly)

see set_up_pipes



1033
1034
1035
# File 'lib/parallelpipes.rb', line 1033

def tt_required
  @tt_required
end

Instance Method Details

#assigned?(pipe_no) ⇒ Boolean

When a new process is created, there is a delay before that new process has a pipe assigned to it in every other process. This method checks whether the new process (with pipe number equal to pipe_no) has had its pipe assigned in the calling process.

pipe_no can be the pipe number of the calling process

Returns:

  • (Boolean)

Raises:



1262
1263
1264
1265
1266
# File 'lib/parallelpipes.rb', line 1262

def assigned?(pipe_no)
  raise PPipeFatal.new("tried to call assigned? for pipe #{pipe_no} when the highest pipe number is #{@pipes.size - 1} (remember pipe numbers are 0 based)") if pipe_no >= @pipes.size  
  check_messages
  return @pipe_counter > pipe_no
end

#dieObject

Close all pipes and terminate all communication with every other process. Send a message to every other process letting them know this process has terminated communication.

It’s good practice to call this in every process before the process exits.

NB If you pass a block to Methods#fork, it will call die automatically when the block is finished, so die only needs to be called if the process exits midway through the block (consider using Methods#exit instead of Kernel.exit in this case)

Raises:



1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
# File 'lib/parallelpipes.rb', line 1746

def die
  raise DeadParallelPipe unless @alive
  i_send(:pipe_finished, @mpn, {evaluate: true, tc: true})
  @alive = false
  @pipes.each do |pipe|
    next unless pipe
    begin
#         $stderr.puts 'closing',  i
      if pipe.class == IO
        pipe.close
      else
        pipe[0].close; pipe[1].close
      end
    rescue IOError, NoMethodError
      next   

    end
  end
  @pipes = nil
  @mutexes = nil
  @pid=nil
end

#exitObject

Calls Methods#die (if the ppipe is still alive) then calls Kernel.exit



1771
1772
1773
1774
# File 'lib/parallelpipes.rb', line 1771

def exit
  die if @alive
  Kernel.exit
end

#finishObject

Calls Methods#waitall, stops the controller if there is a controller and then calls Methods#die



1778
1779
1780
1781
1782
1783
# File 'lib/parallelpipes.rb', line 1778

def finish
  waitall
  stop_controller if @controller
  die
#     return statuses
end

#fork(num_forks = nil, &block) ⇒ Object

Create a new process. If num_forks is unspecified, creates one new process, and returns the pipe number of that process. If num_forks is specified, creates num_forks new processes and returns an array of pipe numbers.

If a block is given, the child process(es) will execute the block and then exit

If there is no controller, only the root PPipe can fork. (Otherwise there is no way to prevent two processes forking at the same time).

Raises:



1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
# File 'lib/parallelpipes.rb', line 1116

def fork(num_forks=nil, &block)
    ArgumentError.check([:num_forks, num_forks, [NilClass, Fixnum]])
  raise DeadParallelPipe unless @alive
  raise PPipeFatal.new("Only the root parallel pipe can fork (you can test this with .is_root), unless there is a controller. Otherwise it is impossible to synchronize forking, and to provide unique ids to all new processes.") unless @is_root or @controller
  forks = []
  lock(:ppipe_fork_mutex) if @controller 
  (num_forks or 1).times do
    log 'fpv', :fork
      log 'v9', 'about to check messages before forking'
    check_messages
    raise PPipeFatal.new("insufficient pipes") if @pipe_counter >= @pipes.size
      log 'v9', 'checked messages before forking'
    pipe_counter  = @pipe_counter
    new_pipe = IO.pipe
    @pipes[@mpn] = new_pipe[1]
    cur_pid = Process.pid
    begin
      Thread.ppipe_join
    rescue => err
      (unlock(:ppipe_fork_mutex) if @controller; raise err)
    end
    Thread.list.each{|th| (unlock(:ppipe_fork_mutex) if @controller; raise ThreadingError.new("Must have only one live thread when calling fork, if PPipe.thread_safe == true (suggest you join all other threads before calling). PPipe.thread_safe can be set to false, but this can lead to many errors if forks are made with multiple live threads.")) if th.alive? and not th == Thread.current} if @thread_check
    pid = Process.fork
    if not pid
      Thread.list.each{|th| th.kill if th.alive? and not th == Thread.current} if @thread_safe
      @is_root = false
      @mpn = pipe_counter
      log 'pv3', :forked
      log 'v9', "My New Pipe No is: #{pipe_counter}"
      @pipes[pipe_counter][1].close
      @my_pipes = [[cur_pid, @pipes[pipe_counter][0]]] 
      @pipes[pipe_counter] = nil #@pipes[pipe_counter][0] 
      @user_end, @my_end = IO.pipe
      if @redirect
        $stdout = @pipes[0]
        $stdin = self #@user_end
      end
        log 'v9', 'about to yield', @mpn
      @thread_mutexes = {}
      @shared_resource_mutexes = {}
      @messages = {}
      @old_controllers = {}
      @message_counter = 0
      (block.call; exit) if block
      return nil
    end
#         @my_pipes.each{|(pid, pipe)| pipe.close}
#         @pipes.slice(0...@mpn).each{|pipe| next unless pipe; pipe.close}
    @my_pipes.push [pid, new_pipe[0]]
    new_pipe[1].close
    i_send(:add_pid, [pipe_counter, pid], {evaluate: true, tc: true})
    i_send(:increase_pipe_counter, true, {evaluate: true, tc: true})
    forks.push pipe_counter
  end
  unlock(:ppipe_fork_mutex) if @controller 
  return num_forks ? forks : forks[0]
end

#gets(sep = $/) ⇒ Object



1625
1626
1627
1628
1629
1630
1631
1632
# File 'lib/parallelpipes.rb', line 1625

def gets(sep = $/)
  check_messages
  until ans = get_line(@user_end, $/)
    check_messages
    sleep 0.001
  end
  return ans
end

#i_recv(label, options = {}) ⇒ Object

Immediate Receive

Receive a message with the given label. Returns a PPipe::Message immediately. The message has an internal thread which keeps checking if the message has arrived.

You can find out if the message has arrived by calling Message#arrived?. You can wait for the message to arrive by calling Message#join. You can stop the message checking by calling Message#kill.

NB You cannot fork (in this process) until the message has arrived (or been killed) - otherwise two processes will be checking for the same message.

See t_recv for possible options.



1590
1591
1592
1593
1594
1595
1596
1597
1598
# File 'lib/parallelpipes.rb', line 1590

def i_recv(label, options={})
  th = Thread.ppipe_new do
    thread = Thread.current
    thread[:ppipe_message] = true
    thread[:label] = label
    thread[:message]= w_recv(label, options)
  end
  return Message.with_listening_thread(th)
end

#i_send(*args) ⇒ Object

<tt>i_send(label, contents, options={})

i_send(message, options={})</tt>

Immediate Send

Send a message and don’t wait for the destination pipe(s) to confirm they have received it (i.e. this call will return immediately)

  • label must be a Symbol, String or Integer

  • contents can be any object where eval(contents.inspect) == contents, except for nil or false

  • message must be a PPipe::Message

Possible options are:

One of:

  • tp (Integer or Array): the pipe to send the message to or a list of pipes to send the message to

  • ep (Integer or Array): a pipe not to send the message to or a list of pipes not to send the message to

Note: if you don’t specify tp, the message will be sent to every currently assigned pipe (excluding pipes in ep if it is specified) (think broadcast in standard MPI) . However, be warned: a new process that you have just created may not yet be known about yet in every other process. Therefore, if you send a message to every pipe just after you have called fork in a different process, the newly created process might not get the message. See Methods#wait_till_assigned.

One of:

  • tt (Integer or Array): the thread id to send the message to or a list of thread ids to send the message to

  • et (Integer or Array): a thread id not to send the message to or a list of thread ids not to send the message to #

Note: if you specify neither, the message will be sent to every thread

Any of:

  • reject_unassigned (true or false, default false): when a new process is created, its existence is not known immediately to every other process (a message is send out automatically and is processed at some point). If you send a message to a pipe that is currently unassigned to a process, PPipe will assume that you know such a process is about to be created, and wait till it has been informed that the new process has been created. If that process is never created, i_send will hang. You can change this behaviour by setting reject_unassigned to true. If you do, PPipe will return an error if the pipe is unassigned.

  • blocking (true or false, default false): wait until the other process(es) or thread(s) have received the message. To make your code more clear, it is recommended that you do not use this option, and instead use Methods#w_send.

  • tc (true or false, default false): If no tp or tps is specified, the message will be sent to every pipe, except the controller - unless tc == true. Not needed for normal use (used mostly for internal messages).

e.g. i_send(:trees, [‘birch’, ‘pine’], tp: 5)

i_send(:heroes, [‘David’, ‘Perseus’], tp: [2,5,21], tt: 6346023, reject_unassigned: true)

i_send(:greeting, ‘Hello every other pipe’)

Raises:



1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
# File 'lib/parallelpipes.rb', line 1331

def i_send(*args)
  log 'fpv', :i_send
  @message_counter += 1
  message = nil
  case args.size
  when 3
    
    message = Message.new(*args)
  when 2
    case args[0].class
    when PPipe::Message
      message = args[0]
      message.options[:fp] = nil; message.options[:ft] = nil
      args[1].each{|key, value| message.options[key] = value}
    else
      message = Message.new(*args)
    end
  when 1
    raise ArgumentError.new("Argument to i_send must be a PPipe::Message if only one argument provided") unless args[0].is_a_ppipe_message?
    message = args[0]
    message.options[:fp] = nil; message.options[:ft] = nil
  when 0
    raise ArgumentError.new("No argument supplied to i_send")
  end
  raise ArgumentError.new("Specifying fp or ft when sending messages is redundant; did you mean to specify tp or tt?") if message.fp or message.ft 
  raise DeadParallelPipe unless @alive
  raise ArgumentError.new("Contents cannot be nil or false; label #{label}; pipe_no #{pipe_no}") unless message.contents
  log 'v9', "To pipe_no: " + message.tp.to_s
  broadcast = message.tps ? false : true

#     to_pipes = message.tp ? (message.tp.class == Array ? message.tp : [message.tp]) :  ([email protected]).to_a
  message.options[:fp] = @mpn #fp = from pipe number
  message.options[:ft] = tid #ft = from thread

#     $stderr.puts 'calling broadcast' if broadcast
  to_pipes = (message.pop_tps or ((0...@pipes.size).to_a - (message.pop_eps or [])))
  to_pipes.each do |pipe_no|
    next if @controller and pipe_no == @controller and broadcast and not message.options[:tc]
    if pipe_no == @mpn #tp = to pipe number
      if message.options[:evaluate]
        log 'v9', 'evaluating locally...'
        send(message.label, message.contents) if !message.tt or message.tt == tid
      else
        
        to_threads = (message.tts or (Thread.ppipe_list_live_ids - (message.ets or [])))
          to_threads.each do |id|
          @messages[id] ||= {}
          @messages[id][message.label] ||= {}
          @messages[id][message.label][@mpn] ||= {} 

          @messages[id][message.label][@mpn][message.ft] ||= [] 
          @messages[id][message.label][@mpn][message.ft].push message
        end
      end
    elsif @pipes[pipe_no].class == IO
      begin
#           $stderr.puts message.class
        (message.options[:re] = @envelope; @envelope+=1) if message.blocking #re = return envelope
        log 'v2', "#@mpn->#{pipe_no}: #{message.to_transmission}"
        @pipes[pipe_no].print message.to_transmission
        if message.blocking
          if message.tts 
            message.tts.each do |thread_id| 
              w_recv(message.label + message.options[:re].to_s.to_sym, fp: pipe_no, ft: thread_id, timeout: message.options[:timeout])
            end
          else
            w_recv(message.label + message.options[:re].to_s.to_sym, fp: pipe_no, timeout: message.options[:timeout])
          end
        
#           @messages[tid].delete(@messages[tid].key(label + no.to_s))
        end
          
      rescue Errno::EPIPE
        pipe_finished(pipe_no)
#           i_send(:pipe_finished, pipe_no, evaluate: true, tp: @mpn) 
        i_send(:pipe_finished, pipe_no, evaluate: true, tc: true) 
#           @pipes[pipe_no] = nil; @pids.delete(pipe_no)
        raise DeadPipe.new("This pipe: #{pipe_no} is dead; probably the process it corresponds to has terminated or called die") unless broadcast

      end
    elsif pipe_no >= @pipes.size
      raise PPipeFatal.new("Pipe #@mpn tried to send a message to pipe #{pipe_no} out of only #{@pipes.size} pipes")
    elsif @finished_pipes[pipe_no]
      raise DeadPipe.new("This pipe: #{pipe_no} is finished; probably the process it corresponds to has terminated or called die") unless broadcast
    elsif !@pipes[pipe_no]
      pipe_finished(pipe_no)
      i_send(:pipe_finished, pipe_no, evaluate: true, tc: true)
      raise DeadPipe.new("This pipe: #{pipe_no} is dead; probably the process it corresponds to has terminated") unless broadcast
    elsif @pipes[pipe_no].class == Array
      if message.options[:reject_unassigned]
        raise UnassignedPipe.new("Pipe #@mpn attempted to write to a pipe that has not yet been assigned to a process") unless broadcast 
      elsif !broadcast
        sleep 0.001
        check_messages
#           $stderr.puts 'Houston, we would have had a problem'; exit
        redo
      end
    else
      raise PPipeFatal.new("Unknown error in i_send from pipe #@mpn: @pipes evaluted at the requested pipe_no is neither a pipe, nil or an array")
    end
  end
  log 'fpcv6', :i_send
end

#kill_all(kill_controller = @controller) ⇒ Object

Suddenly and violently terminate every process except the current one. Messy and unpredictable. An emergency measure only.

Doesn’t kill the controller if kill_controller is set to false.

Raises:



1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
# File 'lib/parallelpipes.rb', line 1791

def kill_all(kill_controller=@controller) #(force=false)
#     stop_controller if @controller and not force
  raise NoController.new("Tried to kill controller but no controller") if kill_controller and not @controller
  @pids.each do |pipe_no, pid|
    next if @controller and pipe_no == @controller
    begin
      kill_pipe(pipe_no) unless pid == Process.pid
    rescue DeadPipe
      next
    end
  end
  Process.kill('TERM', @pids[@controller]) if kill_controller
#     die
end

#kill_pipe(pipe_no, signal = 'TERM') ⇒ Object

Suddenly terminate the process corresponding to pipe_no.

Raises:



1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
# File 'lib/parallelpipes.rb', line 1808

def kill_pipe(pipe_no, signal='TERM')
  raise ControllerError.new("Can't kill controller: use stop_controller instead") if @controller and pipe_no == @controller
    log '', 'killing pipe: ', pipe_no
  begin
    @pipes[pipe_no].puts
  rescue PPipe::DeadPipe
    raise DeadPipe.new("This pipe (#{pipe_no}) is already dead")
  end
  begin 
    Process.kill(signal, @pids[pipe_no])
  rescue Errno::ESRCH
    raise DeadPipe.new("The process corresponding to this pipe (#{pipe_no}) is already dead")
  end
  pipe = @pipes[pipe_no] 
  begin
#         $stderr.puts 'closing',  i
    if pipe.class == IO
      pipe.close
    else
      pipe[0].close; pipe[1].close
    end
  rescue IOError, NoMethodError
  end
  @pipes[pipe_no] = nil
  @pids.delete(pipe_no)
end

#pidsObject



1663
1664
1665
# File 'lib/parallelpipes.rb', line 1663

def pids
  return @pids.dup
end

#puts(pipe_no, *args) ⇒ Object

Send something to the $stdin of the child process with pipe number pipe_no (if redirect is on). If pipe_no = nil call Kernel.puts. Can be called by any other process.

Raises:



1602
1603
1604
1605
1606
# File 'lib/parallelpipes.rb', line 1602

def puts(pipe_no, *args)
  raise PPipeFatal.new("calling this doesn't make any sense unless redirect is on") unless @redirect
  return Kernel.puts(*args) unless pipe_no
  @pipes[pipe_no].puts(*args)
end

#read_all(pipe = @user_end) ⇒ Object



1615
1616
1617
1618
# File 'lib/parallelpipes.rb', line 1615

def read_all(pipe=@user_end)
  check_messages
  return read_pipe(pipe)
end

#set_up_pipes(num_pipes, use_controller, options = {}) ⇒ Object

Raises:



1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
# File 'lib/parallelpipes.rb', line 1061

def set_up_pipes(num_pipes, use_controller, options={})
  ArgumentError.check([:num_pipes, num_pipes, Fixnum], [:options, options, Hash], [:use_controller, use_controller, [TrueClass, FalseClass]])
  raise PPipeFatal.new("Only the root PPipe can set up pipes (otherwise chaos would ensue!)") unless @is_root == true or @is_root == nil
  raise PPipeFatal.new("Attempting to set up pipes when this ppipe is already set up (call die or finish first if you want to set up a bunch of new pipes)") if @alive
  
  @alive = true
  @pipes = [nil] + (1...num_pipes).to_a.map{|a| IO.pipe}
  @pipe_counter = 1 # 0 is the root process
  @my_pipes = []
  @mutexes = {}
  @messages = {}
  @user_end, @my_end = IO.pipe
  @is_root = true
  @mpn = 0
  @max_mutex_id =  0
  @envelope = 0
  @pids = {}
  @redirect = true unless options[:redirect] == false
  @controller_asleep = false
  @thread_mutexes = {}
  @shared_resource_mutexes = {}
  @weak_synchronization = options[:weak_synchronization]
  @check_messages_mutex = Mutex.new
  @thread_safe = options[:thread_safe] 
  @thread_check = true unless options[:thread_check] == false
#     @print_messages = options[:print_messages] 
  @tt_required = options[:tt_required] 
  @tp_required = options[:tp_required] 
#     @fvb = 6
  @last_segment = ""
  @verbosity = (options[:verbosity] or $log_verbosity or 0)
  @log_defaults = {}
  @log_defaults[[:f, :p].sort] = 7
  @log_defaults[[:f, :p, :c].sort] = 7
  @message_counter = 0
  @finished_pipes = {}

  #These parts must happen after initialization of variables
  configure_reader
  begin
    controller_refresh = (options[:controller_refresh] or 0.001)
    start_controller(controller_refresh) if use_controller
  rescue NoMethodError => err
    log 'v3', err
    log 'v3', err.class, err.backtrace
    raise PPipeFatal.new("Tried to use a controller without including the Controller module")
  end
end

#t_recv(label, options = {}) ⇒ Object

> nil or PPipe::Message

Raises:



1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
# File 'lib/parallelpipes.rb', line 1496

def t_recv(label, options={}) # => nil or PPipe::Message
  ArgumentError.check([:label, label, [Symbol, String, Fixnum]], [:options, options, Hash])
  
  raise DeadParallelPipe unless @alive
#     $stderr.puts options.class
  raise ArgumentError.new("Specifying tp or tt when receiving messages is redundant; did you mean to specify fp or ft?") if options[:tt] or options[:tp]
  if options[:fp]
    raise Ambiguity.new("specified fp and fps simultaneously") if options[:fps]
    fps = [options[:fp]]
  elsif options[:fps]
    fps = options[:fps]
  else
    fps = nil
  end
  if options[:ft]
    raise Ambiguity.new("specified fp and fps simultaneously") if options[:fts]
    fts = [options[:ft]]
  elsif options[:fts]
    fts = options[:fts]
  else
    fts = nil
  end

  message = nil
  index = nil

  check_messages
#       $stderr.puts @messages.inspect
  return nil unless @messages[tid] and @messages[tid][label] 
  search_by_pipe = fps ? @messages[tid][label].values_at(*fps).compact : @messages[tid][label].values.compact
  search_by_pipe.each do |list|
    next unless list
    search_by_thread = fts ? list.values_at(*fts).compact : list.values.compact
    search_by_thread.each{|th_list| message = th_list.shift if th_list.size > 0}
    break if message        
  end


  if message and message.blocking
#       $stderr.puts 'found blocking message', message.label
    loop do
      begin
        i_send((message.label+message.options[:re].to_s).to_sym, true, {tt: message.ft, tp: message.fp})
        break
      rescue UnassignedPipe
        log 'v3', 'rescued unassigned pipe in w_recv, retrying...'
        check_messages
      end
    end
  end
  return message
end

#tidObject

returns Thread.current.object_id



1254
1255
1256
# File 'lib/parallelpipes.rb', line 1254

def tid
  return Thread.current.object_id
end

#user_endObject

If redirect is on, the $stdout of all other processes is connected to the root process. Any output from these processes ends up in a pipe called @user_end. So if this is the root ppipe then this method provides access to that output. This method also causes PPipe to process input, flushing any input that is not a PPipe message into @user_end.

Notes:

  • Only the root ppipe can call this function.

  • This function will raise an error if redirect == false

  • The pipe @user_end contains output from every other process, in no order

If the PPipe has not yet processed the output from the child pipe, it will be stored internally in PPipe, and not in @user_end. Therefore a call such as

ppipe.user_end.gets

may block forever.

It is recommended that instead something is written like: while input = ppipe.read_all(ppipe.user_end) # read_all never blocks #… process input end

Raises:



1653
1654
1655
1656
1657
1658
# File 'lib/parallelpipes.rb', line 1653

def user_end
  raise PPipeFatal.new("redirect is off - calling user_end makes no sense") unless @redirect
  raise PPipeFatal.new("Only the root pipe can call user_end") unless @is_root
  check_messages
  @user_end
end

#w_recv(label, options = {}) ⇒ Object

Wait Receive

Receive a message with the given label. Does not return until the message has been received.

Possible options

  • timeout (seconds): raise a PPipe::RecvTimeout error if the message has not arrived after the give number of seconds

  • refresh_time (seconds, default is 0.001): The time between rechecking to see if the message has arrived.

For other possible options, see t_recv .

NB: all this does is keep calling t_recv in a loop!



1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
# File 'lib/parallelpipes.rb', line 1563

def w_recv(label, options={})
  ArgumentError.check([:label, label, [Symbol, String, Fixnum]], [:options, options, Hash])
  log 'fpv', :w_recv
  time = Time.now.to_f
    message = nil
  loop do
#       $stderr.puts "trying to get #{label.inspect} from #{options[:fp]}" 
    message = t_recv(label, options)
    break if message
    raise RecvTimeout.new("breaking w_recv on timeout") if options[:timeout] and (Time.now.to_f > time + options[:timeout])
    sleep (0.001 or options[:refresh_time])
  end
#     $stderr.puts "got message #{message}"
  return message
end

#w_send(*args) ⇒ Object

Wait Send

Calls i_send with blocking set to true.

The call will not return until the other process(es) or thread(s) has received the message. Thus, doing a w_send, w_recv is a way of ensuring that two processes are at a given place at the same time.

If you specify more than one pipe for a blocking message to be sent to, it will wait till all the destination processes have confirmed they have received it. (NB Specifying ep or neither ep or tp (see i_send) means that you are potentially sending the message to more than one process).

With threads, it is slightly different. If you specify tt, it will wait till all the threads you specified have confirmed receiving the message. However, if you specify et or neither et or tt, it will return when the first thread confirms it has received it. This is so one doesn’t have to specify tt every time a blocking message is sent (which would be tedious!).



1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
# File 'lib/parallelpipes.rb', line 1445

def w_send(*args)
  message = nil
  case args.size
  when 3
    message = Message.new(*args)
  when 2
    case args[0].class
    when PPipe::Message
      message = args[0]
      args[1].each{|key, value| message.options[key] = value}
    else
      message = Message.new(*args)
    end
  when 1
    raise ArgumentError.new("Argument to w_send must be a PPipe::Message if only one argument provided") unless args[0].is_a_ppipe_message?
    message = args[0]
  when 0
    raise ArgumentError.new("No argument supplied to w_send")
  else 
    raise ArgumentError.new("Number of arguments supplied to w_send should be 1, 2 or 3")
  end
  message.options[:blocking] = true
#     $stderr.puts message.inspect, message.class
  i_send(message)
end

#wait(pipe_no) ⇒ Object

Wait for another process (whose pipe number is pipe_no) to either call Methods#die or to exit (whichever is sooner).

If the process did not call die before it finished, then it sees if the process is dead by checking if its pipe is broken. However this may not work on all operating systems (so it’s much better to make every process call die before it exits).

Raises:



1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
# File 'lib/parallelpipes.rb', line 1694

def wait(pipe_no) #, wait_remote_child=false, *args)
  raise DeadParallelPipe unless @alive
  raise ControllerError.new("Attempted to wait for the controller (would never return). Use stop_controller if you want the controller to finish") if @controller and pipe_no == @controller
  raise SelfReference.new("Attempted to wait for self") if pipe_no == @mpn
  check_messages
  return if @finished_pipes[pipe_no]
  unless @pipes[pipe_no]
#       @finished_pipes.push[pipe_no]
#       @pids.delete(pipe_no)
    i_send(:pipe_finished, pipe_no, {evaluate: true, tc: true})
    return
  end
  begin
    loop do
      check_messages
      return  if @finished_pipes[pipe_no]   
      sleep 0.4
      @pipes[pipe_no].puts
    end
  rescue Errno::EPIPE
    # If the pipe is broken we deduce that the corresponding process has exited or called die
    i_send(:pipe_finished, pipe_no, evaluate: true, tc: true)
#       @finished_pipes.push[pipe_no]
    return
  end
end

#wait_till_assigned(*args) ⇒ Object

args is a list of pipe numbers. Wait till Methods#assigned?(pipe_no) is true for each pipe_no.

e.g.

wait_till_assigned(2,5,9)

or

ppipe = PPipe.new(7, false) pipe_numbers = ppipe.fork(6) # pipe_numbers is [1,2,3,4,5,6] wait_till_assigned(*pipe_numbers)



1281
1282
1283
1284
1285
1286
1287
# File 'lib/parallelpipes.rb', line 1281

def wait_till_assigned(*args)
  args.each do |pipe_no|
    until assigned?(pipe_no)
      sleep 0.01
    end
  end
end

#waitallObject

Wait for all other processes to either finish or call ppipe.die - see wait

NB If more than one process calls waitall, both will wait for each other and hang forever!

Raises:



1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
# File 'lib/parallelpipes.rb', line 1671

def waitall #(wait_remote_child=false, *args)
  log 'fpv', 'waitall'
  raise DeadParallelPipe unless @alive
#     raise PPipeFatal.new("Only the root process (ppipe.is_root == true) can call waitall if there is no controller") unless @controller or @is_root
#     stop_controller if @controller
  loop do
    @pids.keys.each do |pipe_no|
      log 'v5', 'waiting for pipe_no', pipe_no
      must_wait = !(pipe_no == @mpn or (@controller and pipe_no == @controller))
      wait(pipe_no) if must_wait
#         must_wait
#         arr.push status if status 
    end
    log 'iv8', '@pids = ', @pids
    check_messages
    break unless @pids.keys.find{|pipe_no| !(pipe_no == @mpn) and (!@controller or !(pipe_no == @controller))}
  end
end