Module: ConcurrentStream

Defined in:
lib/rbbt/util/misc/concurrent_stream.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#abort_callbackObject

Returns the value of attribute abort_callback.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def abort_callback
  @abort_callback
end

#abortedObject

Returns the value of attribute aborted.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def aborted
  @aborted
end

#autojoinObject

Returns the value of attribute autojoin.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def autojoin
  @autojoin
end

#callbackObject

Returns the value of attribute callback.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def callback
  @callback
end

#filenameObject

Returns the value of attribute filename.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def filename
  @filename
end

#joinedObject

Returns the value of attribute joined.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def joined
  @joined
end

#pidsObject

Returns the value of attribute pids.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def pids
  @pids
end

#threadsObject

Returns the value of attribute threads.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def threads
  @threads
end

Class Method Details

.setup(stream, options = {}, &block) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 4

def self.setup(stream, options = {}, &block)
  threads, pids, callback, filename, autojoin = Misc.process_options options, :threads, :pids, :callback, :filename, :autojoin
  stream.extend ConcurrentStream unless ConcurrentStream === stream

  stream.threads ||= []
  stream.pids ||= []
  stream.threads.concat(Array === threads ? threads : [threads]) unless threads.nil? 
  stream.pids.concat(Array === pids ? pids : [pids]) unless pids.nil? or pids.empty?
  stream.autojoin = autojoin

  callback = block if block_given?
  if stream.callback and callback
    old_callback = stream.callback
    stream.callback = Proc.new do
      old_callback.call
      callback.call
    end
  else
    stream.callback = callback
  end

  stream.filename = filename unless filename.nil?

  stream
end

Instance Method Details

#abortObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 105

def abort
  Log.warn "Aborting stream #{Misc.fingerprint self} -- #{@abort_callback} [#{@aborted}]"
  return if @aborted
  @aborted = true
  begin
    @callback = nil
    @abort_callback.call if @abort_callback
    @abort_callback = nil
    close unless closed?

    abort_threads
    abort_pids
  end
end

#abort_pidsObject



100
101
102
103
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 100

def abort_pids
  @pids.each{|pid| Process.kill :INT, pid } if @pids
  @pids = []
end

#abort_threadsObject



96
97
98
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 96

def abort_threads
  @threads.each{|t| t.raise Aborted.new unless t == Thread.current } if @threads
end

#aborted?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 49

def aborted?
  @aborted
end

#annotate(stream) ⇒ Object



30
31
32
33
34
35
36
37
38
39
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 30

def annotate(stream)
  ConcurrentStream.setup stream
  stream.threads = threads
  stream.pids = pids
  stream.callback = callback
  stream.abort_callback = abort_callback
  stream.filename = filename
  stream.autojoin = autojoin
  stream.joined = joined
end

#clearObject



41
42
43
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 41

def clear
  threads, pids, callback, abort_callback = nil
end

#joinObject



86
87
88
89
90
91
92
93
94
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 86

def join
  join_threads
  join_pids

  join_callback

  @joined = true
  close unless closed?
end

#join_callbackObject



79
80
81
82
83
84
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 79

def join_callback
  if @callback and not joined?
    @callback.call
    @callback = nil
  end
end

#join_pidsObject



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 66

def join_pids
  if @pids and @pids.any?
    @pids.each do |pid| 
      begin
        Process.waitpid(pid, Process::WUNTRACED)
        raise ProcessFailed.new "Error joining process #{pid} in #{self.inspect}" unless $?.success?
      rescue Errno::ECHILD
      end
    end 
    @pids = []
  end
end

#join_threadsObject



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 53

def join_threads
  if @threads and @threads.any?
    @threads.each do |t| 
      t.join unless t == Thread.current
      #begin
      #ensure
      #  t.join unless t == Thread.current
      #end
    end
    @threads = []
  end
end

#joined?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 45

def joined?
  @joined
end

#super(*args) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 120

def super(*args)
  if autojoin
    begin
      super(*args)
    rescue
      Log.exception $!
      self.abort
      self.join 
      raise $!
    ensure
      self.join if self.closed? or self.eof? 
    end
  else
    super(*args)
  end
end