Module: ConcurrentStream
- Defined in:
- lib/rbbt/util/misc/concurrent_stream.rb
Instance Attribute Summary collapse
-
#abort_callback ⇒ Object
Returns the value of attribute abort_callback.
-
#aborted ⇒ Object
Returns the value of attribute aborted.
-
#autojoin ⇒ Object
Returns the value of attribute autojoin.
-
#callback ⇒ Object
Returns the value of attribute callback.
-
#filename ⇒ Object
Returns the value of attribute filename.
-
#joined ⇒ Object
Returns the value of attribute joined.
-
#pids ⇒ Object
Returns the value of attribute pids.
-
#threads ⇒ Object
Returns the value of attribute threads.
Class Method Summary collapse
Instance Method Summary collapse
- #abort ⇒ Object
- #abort_pids ⇒ Object
- #abort_threads ⇒ Object
- #aborted? ⇒ Boolean
- #annotate(stream) ⇒ Object
- #clear ⇒ Object
- #join ⇒ Object
- #join_callback ⇒ Object
- #join_pids ⇒ Object
- #join_threads ⇒ Object
- #joined? ⇒ Boolean
- #super(*args) ⇒ Object
Instance Attribute Details
#abort_callback ⇒ Object
Returns the value of attribute abort_callback.
2 3 4 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2 def abort_callback @abort_callback end |
#aborted ⇒ Object
Returns the value of attribute aborted.
2 3 4 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2 def aborted @aborted end |
#autojoin ⇒ Object
Returns the value of attribute autojoin.
2 3 4 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2 def autojoin @autojoin end |
#callback ⇒ Object
Returns the value of attribute callback.
2 3 4 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2 def callback @callback end |
#filename ⇒ Object
Returns the value of attribute filename.
2 3 4 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2 def filename @filename end |
#joined ⇒ Object
Returns the value of attribute joined.
2 3 4 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2 def joined @joined end |
#pids ⇒ Object
Returns the value of attribute pids.
2 3 4 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2 def pids @pids end |
#threads ⇒ Object
Returns the value of attribute threads.
2 3 4 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2 def threads @threads end |
Class Method Details
.setup(stream, options = {}, &block) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 4 def self.setup(stream, = {}, &block) threads, pids, callback, filename, autojoin = Misc. , :threads, :pids, :callback, :filename, :autojoin stream.extend ConcurrentStream unless ConcurrentStream === stream stream.threads ||= [] stream.pids ||= [] stream.threads.concat(Array === threads ? threads : [threads]) unless threads.nil? stream.pids.concat(Array === pids ? pids : [pids]) unless pids.nil? or pids.empty? stream.autojoin = autojoin callback = block if block_given? if stream.callback and callback old_callback = stream.callback stream.callback = Proc.new do old_callback.call callback.call end else stream.callback = callback end stream.filename = filename unless filename.nil? stream end |
Instance Method Details
#abort ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 105 def abort Log.warn "Aborting stream #{Misc.fingerprint self} -- #{@abort_callback} [#{@aborted}]" return if @aborted @aborted = true begin @callback = nil @abort_callback.call if @abort_callback @abort_callback = nil close unless closed? abort_threads abort_pids end end |
#abort_pids ⇒ Object
100 101 102 103 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 100 def abort_pids @pids.each{|pid| Process.kill :INT, pid } if @pids @pids = [] end |
#abort_threads ⇒ Object
96 97 98 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 96 def abort_threads @threads.each{|t| t.raise Aborted.new unless t == Thread.current } if @threads end |
#aborted? ⇒ Boolean
49 50 51 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 49 def aborted? @aborted end |
#annotate(stream) ⇒ Object
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 30 def annotate(stream) ConcurrentStream.setup stream stream.threads = threads stream.pids = pids stream.callback = callback stream.abort_callback = abort_callback stream.filename = filename stream.autojoin = autojoin stream.joined = joined end |
#clear ⇒ Object
41 42 43 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 41 def clear threads, pids, callback, abort_callback = nil end |
#join ⇒ Object
86 87 88 89 90 91 92 93 94 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 86 def join join_threads join_pids join_callback @joined = true close unless closed? end |
#join_callback ⇒ Object
79 80 81 82 83 84 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 79 def join_callback if @callback and not joined? @callback.call @callback = nil end end |
#join_pids ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 66 def join_pids if @pids and @pids.any? @pids.each do |pid| begin Process.waitpid(pid, Process::WUNTRACED) raise ProcessFailed.new "Error joining process #{pid} in #{self.inspect}" unless $?.success? rescue Errno::ECHILD end end @pids = [] end end |
#join_threads ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 53 def join_threads if @threads and @threads.any? @threads.each do |t| t.join unless t == Thread.current #begin #ensure # t.join unless t == Thread.current #end end @threads = [] end end |
#joined? ⇒ Boolean
45 46 47 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 45 def joined? @joined end |
#super(*args) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 120 def super(*args) if autojoin begin super(*args) rescue Log.exception $! self.abort self.join raise $! ensure self.join if self.closed? or self.eof? end else super(*args) end end |