Module: PPipe::Methods
Overview
Quick Links: PPipe, parallelpipes.rb, PPipe::Methods, PPipe::Controller, PPipe::Message
The primary purpose of this module is to provide the methods for the class PPipe. Therefore all the method documentation specifies having a PPipe as the receiver. However, it is perfectly possible to use this module as a module in its own right; see the section Modules in parallelpipes.rb
Instance Attribute Summary collapse
-
#is_root ⇒ Object
readonly
is the ppipe the root ppipe (the first one to be created)?.
-
#mpn ⇒ Object
readonly
my pipe number - the pipe number of the PPipe.
-
#redirect ⇒ Object
readonly
see set_up_pipes.
-
#thread_safe ⇒ Object
readonly
see set_up_pipes.
-
#tp_required ⇒ Object
readonly
see set_up_pipes.
-
#tt_required ⇒ Object
readonly
see set_up_pipes.
Attributes included from Log
Instance Method Summary collapse
-
#assigned?(pipe_no) ⇒ Boolean
When a new process is created, there is a delay before that new process has a pipe assigned to it in every other process.
-
#die ⇒ Object
Close all pipes and terminate all communication with every other process.
-
#exit ⇒ Object
Calls Methods#die (if the ppipe is still alive) then calls Kernel.exit.
-
#finish ⇒ Object
Calls Methods#waitall, stops the controller if there is a controller and then calls Methods#die.
-
#fork(num_forks = nil, &block) ⇒ Object
Create a new process.
- #gets(sep = $/) ⇒ Object
-
#i_recv(label, options = {}) ⇒ Object
Immediate Receive.
-
#i_send(*args) ⇒ Object
<tt>i_send(label, contents, options={}).
-
#kill_all(kill_controller = @controller) ⇒ Object
Suddenly and violently terminate every process except the current one.
-
#kill_pipe(pipe_no, signal = 'TERM') ⇒ Object
Suddenly terminate the process corresponding to pipe_no.
- #pids ⇒ Object
-
#puts(pipe_no, *args) ⇒ Object
Send something to the $stdin of the child process with pipe number pipe_no (if redirect is on).
- #read_all(pipe = @user_end) ⇒ Object
- #set_up_pipes(num_pipes, use_controller, options = {}) ⇒ Object
-
#t_recv(label, options = {}) ⇒ Object
> nil or PPipe::Message.
-
#tid ⇒ Object
returns Thread.current.object_id.
-
#user_end ⇒ Object
If redirect is on, the $stdout of all other processes is connected to the root process.
-
#w_recv(label, options = {}) ⇒ Object
Wait Receive.
-
#w_send(*args) ⇒ Object
Wait Send.
-
#wait(pipe_no) ⇒ Object
Wait for another process (whose pipe number is pipe_no) to either call Methods#die or to exit (whichever is sooner).
-
#wait_till_assigned(*args) ⇒ Object
args is a list of pipe numbers.
-
#waitall ⇒ Object
Wait for all other processes to either finish or call ppipe.die - see wait.
Methods included from Log
clean_up, io=, #log, log_file, log_file=
Methods included from Reader
#configure_reader, #get_line, #read_pipe
Instance Attribute Details
#is_root ⇒ Object (readonly)
is the ppipe the root ppipe (the first one to be created)?
1030 1031 1032 |
# File 'lib/parallelpipes.rb', line 1030 def is_root @is_root end |
#mpn ⇒ Object (readonly)
my pipe number - the pipe number of the PPipe
1026 1027 1028 |
# File 'lib/parallelpipes.rb', line 1026 def mpn @mpn end |
#redirect ⇒ Object (readonly)
see set_up_pipes
1033 1034 1035 |
# File 'lib/parallelpipes.rb', line 1033 def redirect @redirect end |
#thread_safe ⇒ Object (readonly)
see set_up_pipes
1033 1034 1035 |
# File 'lib/parallelpipes.rb', line 1033 def thread_safe @thread_safe end |
#tp_required ⇒ Object (readonly)
see set_up_pipes
1033 1034 1035 |
# File 'lib/parallelpipes.rb', line 1033 def tp_required @tp_required end |
#tt_required ⇒ Object (readonly)
see set_up_pipes
1033 1034 1035 |
# File 'lib/parallelpipes.rb', line 1033 def tt_required @tt_required end |
Instance Method Details
#assigned?(pipe_no) ⇒ Boolean
When a new process is created, there is a delay before that new process has a pipe assigned to it in every other process. This method checks whether the new process (with pipe number equal to pipe_no) has had its pipe assigned in the calling process.
pipe_no can be the pipe number of the calling process
1262 1263 1264 1265 1266 |
# File 'lib/parallelpipes.rb', line 1262 def assigned?(pipe_no) raise PPipeFatal.new("tried to call assigned? for pipe #{pipe_no} when the highest pipe number is #{@pipes.size - 1} (remember pipe numbers are 0 based)") if pipe_no >= @pipes.size return @pipe_counter > pipe_no end |
#die ⇒ Object
Close all pipes and terminate all communication with every other process. Send a message to every other process letting them know this process has terminated communication.
It’s good practice to call this in every process before the process exits.
NB If you pass a block to Methods#fork, it will call die automatically when the block is finished, so die only needs to be called if the process exits midway through the block (consider using Methods#exit instead of Kernel.exit in this case)
1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 |
# File 'lib/parallelpipes.rb', line 1746 def die raise DeadParallelPipe unless @alive i_send(:pipe_finished, @mpn, {evaluate: true, tc: true}) @alive = false @pipes.each do |pipe| next unless pipe begin # $stderr.puts 'closing', i if pipe.class == IO pipe.close else pipe[0].close; pipe[1].close end rescue IOError, NoMethodError next end end @pipes = nil @mutexes = nil @pid=nil end |
#exit ⇒ Object
Calls Methods#die (if the ppipe is still alive) then calls Kernel.exit
1771 1772 1773 1774 |
# File 'lib/parallelpipes.rb', line 1771 def exit die if @alive Kernel.exit end |
#finish ⇒ Object
Calls Methods#waitall, stops the controller if there is a controller and then calls Methods#die
1778 1779 1780 1781 1782 1783 |
# File 'lib/parallelpipes.rb', line 1778 def finish waitall stop_controller if @controller die # return statuses end |
#fork(num_forks = nil, &block) ⇒ Object
Create a new process. If num_forks is unspecified, creates one new process, and returns the pipe number of that process. If num_forks is specified, creates num_forks new processes and returns an array of pipe numbers.
If a block is given, the child process(es) will execute the block and then exit
If there is no controller, only the root PPipe can fork. (Otherwise there is no way to prevent two processes forking at the same time).
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 |
# File 'lib/parallelpipes.rb', line 1116 def fork(num_forks=nil, &block) ArgumentError.check([:num_forks, num_forks, [NilClass, Fixnum]]) raise DeadParallelPipe unless @alive raise PPipeFatal.new("Only the root parallel pipe can fork (you can test this with .is_root), unless there is a controller. Otherwise it is impossible to synchronize forking, and to provide unique ids to all new processes.") unless @is_root or @controller forks = [] lock(:ppipe_fork_mutex) if @controller (num_forks or 1).times do log 'fpv', :fork log 'v9', 'about to check messages before forking' raise PPipeFatal.new("insufficient pipes") if @pipe_counter >= @pipes.size log 'v9', 'checked messages before forking' pipe_counter = @pipe_counter new_pipe = IO.pipe @pipes[@mpn] = new_pipe[1] cur_pid = Process.pid begin Thread.ppipe_join rescue => err (unlock(:ppipe_fork_mutex) if @controller; raise err) end Thread.list.each{|th| (unlock(:ppipe_fork_mutex) if @controller; raise ThreadingError.new("Must have only one live thread when calling fork, if PPipe.thread_safe == true (suggest you join all other threads before calling). PPipe.thread_safe can be set to false, but this can lead to many errors if forks are made with multiple live threads.")) if th.alive? and not th == Thread.current} if @thread_check pid = Process.fork if not pid Thread.list.each{|th| th.kill if th.alive? and not th == Thread.current} if @thread_safe @is_root = false @mpn = pipe_counter log 'pv3', :forked log 'v9', "My New Pipe No is: #{pipe_counter}" @pipes[pipe_counter][1].close @my_pipes = [[cur_pid, @pipes[pipe_counter][0]]] @pipes[pipe_counter] = nil #@pipes[pipe_counter][0] @user_end, @my_end = IO.pipe if @redirect $stdout = @pipes[0] $stdin = self #@user_end end log 'v9', 'about to yield', @mpn @thread_mutexes = {} @shared_resource_mutexes = {} = {} @old_controllers = {} = 0 (block.call; exit) if block return nil end # @my_pipes.each{|(pid, pipe)| pipe.close} # @pipes.slice(0...@mpn).each{|pipe| next unless pipe; pipe.close} @my_pipes.push [pid, new_pipe[0]] new_pipe[1].close i_send(:add_pid, [pipe_counter, pid], {evaluate: true, tc: true}) i_send(:increase_pipe_counter, true, {evaluate: true, tc: true}) forks.push pipe_counter end unlock(:ppipe_fork_mutex) if @controller return num_forks ? forks : forks[0] end |
#gets(sep = $/) ⇒ Object
1625 1626 1627 1628 1629 1630 1631 1632 |
# File 'lib/parallelpipes.rb', line 1625 def gets(sep = $/) until ans = get_line(@user_end, $/) sleep 0.001 end return ans end |
#i_recv(label, options = {}) ⇒ Object
Immediate Receive
Receive a message with the given label. Returns a PPipe::Message immediately. The message has an internal thread which keeps checking if the message has arrived.
You can find out if the message has arrived by calling Message#arrived?. You can wait for the message to arrive by calling Message#join. You can stop the message checking by calling Message#kill.
NB You cannot fork (in this process) until the message has arrived (or been killed) - otherwise two processes will be checking for the same message.
See t_recv for possible options.
1590 1591 1592 1593 1594 1595 1596 1597 1598 |
# File 'lib/parallelpipes.rb', line 1590 def i_recv(label, ={}) th = Thread.ppipe_new do thread = Thread.current thread[:ppipe_message] = true thread[:label] = label thread[:message]= w_recv(label, ) end return Message.with_listening_thread(th) end |
#i_send(*args) ⇒ Object
<tt>i_send(label, contents, options={})
i_send(message, options={})</tt>
Immediate Send
Send a message and don’t wait for the destination pipe(s) to confirm they have received it (i.e. this call will return immediately)
-
label must be a Symbol, String or Integer
-
contents can be any object where
eval(contents.inspect) == contents, except fornilorfalse -
message must be a PPipe::Message
Possible options are:
One of:
-
tp (Integer or Array): the pipe to send the message to or a list of pipes to send the message to
-
ep (Integer or Array): a pipe not to send the message to or a list of pipes not to send the message to
Note: if you don’t specify tp, the message will be sent to every currently assigned pipe (excluding pipes in ep if it is specified) (think broadcast in standard MPI) . However, be warned: a new process that you have just created may not yet be known about yet in every other process. Therefore, if you send a message to every pipe just after you have called fork in a different process, the newly created process might not get the message. See Methods#wait_till_assigned.
One of:
-
tt (Integer or Array): the thread id to send the message to or a list of thread ids to send the message to
-
et (Integer or Array): a thread id not to send the message to or a list of thread ids not to send the message to #
Note: if you specify neither, the message will be sent to every thread
Any of:
-
reject_unassigned (true or false, default false): when a new process is created, its existence is not known immediately to every other process (a message is send out automatically and is processed at some point). If you send a message to a pipe that is currently unassigned to a process, PPipe will assume that you know such a process is about to be created, and wait till it has been informed that the new process has been created. If that process is never created, i_send will hang. You can change this behaviour by setting reject_unassigned to true. If you do, PPipe will return an error if the pipe is unassigned.
-
blocking (true or false, default false): wait until the other process(es) or thread(s) have received the message. To make your code more clear, it is recommended that you do not use this option, and instead use Methods#w_send.
-
tc (true or false, default false): If no tp or tps is specified, the message will be sent to every pipe, except the controller - unless tc == true. Not needed for normal use (used mostly for internal messages).
e.g. i_send(:trees, [‘birch’, ‘pine’], tp: 5)
i_send(:heroes, [‘David’, ‘Perseus’], tp: [2,5,21], tt: 6346023, reject_unassigned: true)
i_send(:greeting, ‘Hello every other pipe’)
1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 |
# File 'lib/parallelpipes.rb', line 1331 def i_send(*args) log 'fpv', :i_send += 1 = nil case args.size when 3 = Message.new(*args) when 2 case args[0].class when PPipe::Message = args[0] .[:fp] = nil; .[:ft] = nil args[1].each{|key, value| .[key] = value} else = Message.new(*args) end when 1 raise ArgumentError.new("Argument to i_send must be a PPipe::Message if only one argument provided") unless args[0]. = args[0] .[:fp] = nil; .[:ft] = nil when 0 raise ArgumentError.new("No argument supplied to i_send") end raise ArgumentError.new("Specifying fp or ft when sending messages is redundant; did you mean to specify tp or tt?") if .fp or .ft raise DeadParallelPipe unless @alive raise ArgumentError.new("Contents cannot be nil or false; label #{label}; pipe_no #{pipe_no}") unless .contents log 'v9', "To pipe_no: " + .tp.to_s broadcast = .tps ? false : true # to_pipes = message.tp ? (message.tp.class == Array ? message.tp : [message.tp]) : ([email protected]).to_a .[:fp] = @mpn #fp = from pipe number .[:ft] = tid #ft = from thread # $stderr.puts 'calling broadcast' if broadcast to_pipes = (.pop_tps or ((0...@pipes.size).to_a - (.pop_eps or []))) to_pipes.each do |pipe_no| next if @controller and pipe_no == @controller and broadcast and not .[:tc] if pipe_no == @mpn #tp = to pipe number if .[:evaluate] log 'v9', 'evaluating locally...' send(.label, .contents) if !.tt or .tt == tid else to_threads = (.tts or (Thread.ppipe_list_live_ids - (.ets or []))) to_threads.each do |id| [id] ||= {} [id][.label] ||= {} [id][.label][@mpn] ||= {} [id][.label][@mpn][.ft] ||= [] [id][.label][@mpn][.ft].push end end elsif @pipes[pipe_no].class == IO begin # $stderr.puts message.class (.[:re] = @envelope; @envelope+=1) if .blocking #re = return envelope log 'v2', "#@mpn->#{pipe_no}: #{message.to_transmission}" @pipes[pipe_no].print .to_transmission if .blocking if .tts .tts.each do |thread_id| w_recv(.label + .[:re].to_s.to_sym, fp: pipe_no, ft: thread_id, timeout: .[:timeout]) end else w_recv(.label + .[:re].to_s.to_sym, fp: pipe_no, timeout: .[:timeout]) end # @messages[tid].delete(@messages[tid].key(label + no.to_s)) end rescue Errno::EPIPE pipe_finished(pipe_no) # i_send(:pipe_finished, pipe_no, evaluate: true, tp: @mpn) i_send(:pipe_finished, pipe_no, evaluate: true, tc: true) # @pipes[pipe_no] = nil; @pids.delete(pipe_no) raise DeadPipe.new("This pipe: #{pipe_no} is dead; probably the process it corresponds to has terminated or called die") unless broadcast end elsif pipe_no >= @pipes.size raise PPipeFatal.new("Pipe #@mpn tried to send a message to pipe #{pipe_no} out of only #{@pipes.size} pipes") elsif @finished_pipes[pipe_no] raise DeadPipe.new("This pipe: #{pipe_no} is finished; probably the process it corresponds to has terminated or called die") unless broadcast elsif !@pipes[pipe_no] pipe_finished(pipe_no) i_send(:pipe_finished, pipe_no, evaluate: true, tc: true) raise DeadPipe.new("This pipe: #{pipe_no} is dead; probably the process it corresponds to has terminated") unless broadcast elsif @pipes[pipe_no].class == Array if .[:reject_unassigned] raise UnassignedPipe.new("Pipe #@mpn attempted to write to a pipe that has not yet been assigned to a process") unless broadcast elsif !broadcast sleep 0.001 # $stderr.puts 'Houston, we would have had a problem'; exit redo end else raise PPipeFatal.new("Unknown error in i_send from pipe #@mpn: @pipes evaluted at the requested pipe_no is neither a pipe, nil or an array") end end log 'fpcv6', :i_send end |
#kill_all(kill_controller = @controller) ⇒ Object
Suddenly and violently terminate every process except the current one. Messy and unpredictable. An emergency measure only.
Doesn’t kill the controller if kill_controller is set to false.
1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 |
# File 'lib/parallelpipes.rb', line 1791 def kill_all(kill_controller=@controller) #(force=false) # stop_controller if @controller and not force raise NoController.new("Tried to kill controller but no controller") if kill_controller and not @controller @pids.each do |pipe_no, pid| next if @controller and pipe_no == @controller begin kill_pipe(pipe_no) unless pid == Process.pid rescue DeadPipe next end end Process.kill('TERM', @pids[@controller]) if kill_controller # die end |
#kill_pipe(pipe_no, signal = 'TERM') ⇒ Object
Suddenly terminate the process corresponding to pipe_no.
1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 |
# File 'lib/parallelpipes.rb', line 1808 def kill_pipe(pipe_no, signal='TERM') raise ControllerError.new("Can't kill controller: use stop_controller instead") if @controller and pipe_no == @controller log '', 'killing pipe: ', pipe_no begin @pipes[pipe_no].puts rescue PPipe::DeadPipe raise DeadPipe.new("This pipe (#{pipe_no}) is already dead") end begin Process.kill(signal, @pids[pipe_no]) rescue Errno::ESRCH raise DeadPipe.new("The process corresponding to this pipe (#{pipe_no}) is already dead") end pipe = @pipes[pipe_no] begin # $stderr.puts 'closing', i if pipe.class == IO pipe.close else pipe[0].close; pipe[1].close end rescue IOError, NoMethodError end @pipes[pipe_no] = nil @pids.delete(pipe_no) end |
#pids ⇒ Object
1663 1664 1665 |
# File 'lib/parallelpipes.rb', line 1663 def pids return @pids.dup end |
#puts(pipe_no, *args) ⇒ Object
Send something to the $stdin of the child process with pipe number pipe_no (if redirect is on). If pipe_no = nil call Kernel.puts. Can be called by any other process.
1602 1603 1604 1605 1606 |
# File 'lib/parallelpipes.rb', line 1602 def puts(pipe_no, *args) raise PPipeFatal.new("calling this doesn't make any sense unless redirect is on") unless @redirect return Kernel.puts(*args) unless pipe_no @pipes[pipe_no].puts(*args) end |
#read_all(pipe = @user_end) ⇒ Object
1615 1616 1617 1618 |
# File 'lib/parallelpipes.rb', line 1615 def read_all(pipe=@user_end) return read_pipe(pipe) end |
#set_up_pipes(num_pipes, use_controller, options = {}) ⇒ Object
1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 |
# File 'lib/parallelpipes.rb', line 1061 def set_up_pipes(num_pipes, use_controller, ={}) ArgumentError.check([:num_pipes, num_pipes, Fixnum], [:options, , Hash], [:use_controller, use_controller, [TrueClass, FalseClass]]) raise PPipeFatal.new("Only the root PPipe can set up pipes (otherwise chaos would ensue!)") unless @is_root == true or @is_root == nil raise PPipeFatal.new("Attempting to set up pipes when this ppipe is already set up (call die or finish first if you want to set up a bunch of new pipes)") if @alive @alive = true @pipes = [nil] + (1...num_pipes).to_a.map{|a| IO.pipe} @pipe_counter = 1 # 0 is the root process @my_pipes = [] @mutexes = {} = {} @user_end, @my_end = IO.pipe @is_root = true @mpn = 0 @max_mutex_id = 0 @envelope = 0 @pids = {} @redirect = true unless [:redirect] == false @controller_asleep = false @thread_mutexes = {} @shared_resource_mutexes = {} @weak_synchronization = [:weak_synchronization] = Mutex.new @thread_safe = [:thread_safe] @thread_check = true unless [:thread_check] == false # @print_messages = options[:print_messages] @tt_required = [:tt_required] @tp_required = [:tp_required] # @fvb = 6 @last_segment = "" @verbosity = ([:verbosity] or $log_verbosity or 0) @log_defaults = {} @log_defaults[[:f, :p].sort] = 7 @log_defaults[[:f, :p, :c].sort] = 7 = 0 @finished_pipes = {} #These parts must happen after initialization of variables configure_reader begin controller_refresh = ([:controller_refresh] or 0.001) start_controller(controller_refresh) if use_controller rescue NoMethodError => err log 'v3', err log 'v3', err.class, err.backtrace raise PPipeFatal.new("Tried to use a controller without including the Controller module") end end |
#t_recv(label, options = {}) ⇒ Object
> nil or PPipe::Message
1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 |
# File 'lib/parallelpipes.rb', line 1496 def t_recv(label, ={}) # => nil or PPipe::Message ArgumentError.check([:label, label, [Symbol, String, Fixnum]], [:options, , Hash]) raise DeadParallelPipe unless @alive # $stderr.puts options.class raise ArgumentError.new("Specifying tp or tt when receiving messages is redundant; did you mean to specify fp or ft?") if [:tt] or [:tp] if [:fp] raise Ambiguity.new("specified fp and fps simultaneously") if [:fps] fps = [[:fp]] elsif [:fps] fps = [:fps] else fps = nil end if [:ft] raise Ambiguity.new("specified fp and fps simultaneously") if [:fts] fts = [[:ft]] elsif [:fts] fts = [:fts] else fts = nil end = nil index = nil # $stderr.puts @messages.inspect return nil unless [tid] and [tid][label] search_by_pipe = fps ? [tid][label].values_at(*fps).compact : [tid][label].values.compact search_by_pipe.each do |list| next unless list search_by_thread = fts ? list.values_at(*fts).compact : list.values.compact search_by_thread.each{|th_list| = th_list.shift if th_list.size > 0} break if end if and .blocking # $stderr.puts 'found blocking message', message.label loop do begin i_send((.label+.[:re].to_s).to_sym, true, {tt: .ft, tp: .fp}) break rescue UnassignedPipe log 'v3', 'rescued unassigned pipe in w_recv, retrying...' end end end return end |
#tid ⇒ Object
returns Thread.current.object_id
1254 1255 1256 |
# File 'lib/parallelpipes.rb', line 1254 def tid return Thread.current.object_id end |
#user_end ⇒ Object
If redirect is on, the $stdout of all other processes is connected to the root process. Any output from these processes ends up in a pipe called @user_end. So if this is the root ppipe then this method provides access to that output. This method also causes PPipe to process input, flushing any input that is not a PPipe message into @user_end.
Notes:
-
Only the root ppipe can call this function.
-
This function will raise an error if redirect == false
-
The pipe @user_end contains output from every other process, in no order
If the PPipe has not yet processed the output from the child pipe, it will be stored internally in PPipe, and not in @user_end. Therefore a call such as
ppipe.user_end.gets
may block forever.
It is recommended that instead something is written like: while input = ppipe.read_all(ppipe.user_end) # read_all never blocks #… process input end
1653 1654 1655 1656 1657 1658 |
# File 'lib/parallelpipes.rb', line 1653 def user_end raise PPipeFatal.new("redirect is off - calling user_end makes no sense") unless @redirect raise PPipeFatal.new("Only the root pipe can call user_end") unless @is_root @user_end end |
#w_recv(label, options = {}) ⇒ Object
Wait Receive
Receive a message with the given label. Does not return until the message has been received.
Possible options
-
timeout (seconds): raise a PPipe::RecvTimeout error if the message has not arrived after the give number of seconds
-
refresh_time (seconds, default is 0.001): The time between rechecking to see if the message has arrived.
For other possible options, see t_recv .
NB: all this does is keep calling t_recv in a loop!
1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 |
# File 'lib/parallelpipes.rb', line 1563 def w_recv(label, ={}) ArgumentError.check([:label, label, [Symbol, String, Fixnum]], [:options, , Hash]) log 'fpv', :w_recv time = Time.now.to_f = nil loop do # $stderr.puts "trying to get #{label.inspect} from #{options[:fp]}" = t_recv(label, ) break if raise RecvTimeout.new("breaking w_recv on timeout") if [:timeout] and (Time.now.to_f > time + [:timeout]) sleep (0.001 or [:refresh_time]) end # $stderr.puts "got message #{message}" return end |
#w_send(*args) ⇒ Object
Wait Send
Calls i_send with blocking set to true.
The call will not return until the other process(es) or thread(s) has received the message. Thus, doing a w_send, w_recv is a way of ensuring that two processes are at a given place at the same time.
If you specify more than one pipe for a blocking message to be sent to, it will wait till all the destination processes have confirmed they have received it. (NB Specifying ep or neither ep or tp (see i_send) means that you are potentially sending the message to more than one process).
With threads, it is slightly different. If you specify tt, it will wait till all the threads you specified have confirmed receiving the message. However, if you specify et or neither et or tt, it will return when the first thread confirms it has received it. This is so one doesn’t have to specify tt every time a blocking message is sent (which would be tedious!).
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 |
# File 'lib/parallelpipes.rb', line 1445 def w_send(*args) = nil case args.size when 3 = Message.new(*args) when 2 case args[0].class when PPipe::Message = args[0] args[1].each{|key, value| .[key] = value} else = Message.new(*args) end when 1 raise ArgumentError.new("Argument to w_send must be a PPipe::Message if only one argument provided") unless args[0]. = args[0] when 0 raise ArgumentError.new("No argument supplied to w_send") else raise ArgumentError.new("Number of arguments supplied to w_send should be 1, 2 or 3") end .[:blocking] = true # $stderr.puts message.inspect, message.class i_send() end |
#wait(pipe_no) ⇒ Object
Wait for another process (whose pipe number is pipe_no) to either call Methods#die or to exit (whichever is sooner).
If the process did not call die before it finished, then it sees if the process is dead by checking if its pipe is broken. However this may not work on all operating systems (so it’s much better to make every process call die before it exits).
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 |
# File 'lib/parallelpipes.rb', line 1694 def wait(pipe_no) #, wait_remote_child=false, *args) raise DeadParallelPipe unless @alive raise ControllerError.new("Attempted to wait for the controller (would never return). Use stop_controller if you want the controller to finish") if @controller and pipe_no == @controller raise SelfReference.new("Attempted to wait for self") if pipe_no == @mpn return if @finished_pipes[pipe_no] unless @pipes[pipe_no] # @finished_pipes.push[pipe_no] # @pids.delete(pipe_no) i_send(:pipe_finished, pipe_no, {evaluate: true, tc: true}) return end begin loop do return if @finished_pipes[pipe_no] sleep 0.4 @pipes[pipe_no].puts end rescue Errno::EPIPE # If the pipe is broken we deduce that the corresponding process has exited or called die i_send(:pipe_finished, pipe_no, evaluate: true, tc: true) # @finished_pipes.push[pipe_no] return end end |
#wait_till_assigned(*args) ⇒ Object
args is a list of pipe numbers. Wait till Methods#assigned?(pipe_no) is true for each pipe_no.
e.g.
wait_till_assigned(2,5,9)
or
ppipe = PPipe.new(7, false) pipe_numbers = ppipe.fork(6) # pipe_numbers is [1,2,3,4,5,6] wait_till_assigned(*pipe_numbers)
1281 1282 1283 1284 1285 1286 1287 |
# File 'lib/parallelpipes.rb', line 1281 def wait_till_assigned(*args) args.each do |pipe_no| until assigned?(pipe_no) sleep 0.01 end end end |
#waitall ⇒ Object
Wait for all other processes to either finish or call ppipe.die - see wait
NB If more than one process calls waitall, both will wait for each other and hang forever!
1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 |
# File 'lib/parallelpipes.rb', line 1671 def waitall #(wait_remote_child=false, *args) log 'fpv', 'waitall' raise DeadParallelPipe unless @alive # raise PPipeFatal.new("Only the root process (ppipe.is_root == true) can call waitall if there is no controller") unless @controller or @is_root # stop_controller if @controller loop do @pids.keys.each do |pipe_no| log 'v5', 'waiting for pipe_no', pipe_no must_wait = !(pipe_no == @mpn or (@controller and pipe_no == @controller)) wait(pipe_no) if must_wait # must_wait # arr.push status if status end log 'iv8', '@pids = ', @pids break unless @pids.keys.find{|pipe_no| !(pipe_no == @mpn) and (!@controller or !(pipe_no == @controller))} end end |