Module: ConcurrentStream

Defined in:
lib/rbbt/util/misc/concurrent_stream.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#abort_callbackObject

Returns the value of attribute abort_callback.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def abort_callback
  @abort_callback
end

#abortedObject

Returns the value of attribute aborted.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def aborted
  @aborted
end

#autojoinObject

Returns the value of attribute autojoin.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def autojoin
  @autojoin
end

#callbackObject

Returns the value of attribute callback.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def callback
  @callback
end

#filenameObject

Returns the value of attribute filename.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def filename
  @filename
end

#joinedObject

Returns the value of attribute joined.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def joined
  @joined
end

#lockfileObject

Returns the value of attribute lockfile.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def lockfile
  @lockfile
end

#no_failObject

Returns the value of attribute no_fail.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def no_fail
  @no_fail
end

#pidsObject

Returns the value of attribute pids.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def pids
  @pids
end

#threadsObject

Returns the value of attribute threads.



2
3
4
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 2

def threads
  @threads
end

Class Method Details

.setup(stream, options = {}, &block) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 4

def self.setup(stream, options = {}, &block)
  threads, pids, callback, filename, autojoin, lockfile, no_fail = Misc.process_options options, :threads, :pids, :callback, :filename, :autojoin, :lockfile, :no_fail
  stream.extend ConcurrentStream unless ConcurrentStream === stream

  stream.threads ||= []
  stream.pids ||= []
  stream.threads.concat(Array === threads ? threads : [threads]) unless threads.nil? 
  stream.pids.concat(Array === pids ? pids : [pids]) unless pids.nil? or pids.empty?
  stream.autojoin = autojoin
  stream.no_fail = no_fail

  callback = block if block_given?
  if stream.callback and callback
    old_callback = stream.callback
    stream.callback = Proc.new do
      old_callback.call
      callback.call
    end
  else
    stream.callback = callback
  end

  stream.filename = filename unless filename.nil?

  stream.lockfile = lockfile if lockfile

  stream
end

Instance Method Details

#abort(exception = nil) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 131

def abort(exception = nil)
  return if @aborted
  Log.medium "Aborting stream #{Misc.fingerprint self} -- #{@abort_callback} [#{@aborted}]"
  @aborted = true 
  begin
    @callback = nil
    @abort_callback.call if @abort_callback
    @abort_callback = nil
    close unless closed?

    abort_threads(exception)
    abort_pids
  ensure
    lockfile.unlock if lockfile and lockfile.locked?
  end
  Log.medium "Aborted stream #{Misc.fingerprint self} -- #{@abort_callback} [#{@aborted}]"
end

#abort_pidsObject



121
122
123
124
125
126
127
128
129
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 121

def abort_pids
  @pids.each do |pid|
    begin 
      Process.kill :INT, pid 
    rescue Errno::ESRCH
    end
  end if @pids
  @pids = []
end

#abort_threads(exception) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 91

def abort_threads(exception)
  Log.medium "Aborting threads (#{Thread.current.inspect}) #{@threads.collect{|t| t.inspect } * ", "}"

  @threads.each do |t| 
    @aborted = false if t == Thread.current
    next if t == Thread.current
    Log.medium "Aborting thread #{t.inspect}"
    t.raise exception ? exception : Aborted.new 
  end if @threads

  sleeped = false
  @threads.each do |t|
    next if t == Thread.current
    if t.alive? 
      sleep 1 unless sleeped
      sleeped = true
      Log.medium "Kill thread #{t.inspect}"
      t.kill
    end
    begin
      Log.medium "Join thread #{t.inspect}"
      t.join unless t == Thread.current
    rescue Aborted
    rescue Exception
      Log.exception $!
    end
  end
  Log.medium "Aborted threads (#{Thread.current.inspect}) #{@threads.collect{|t| t.inspect } * ", "}"
end

#aborted?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 46

def aborted?
  @aborted
end

#annotate(stream) ⇒ Object



33
34
35
36
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 33

def annotate(stream)
  ConcurrentStream.setup(stream, :threads => threads, :pids => pids, :callback => callback, :abort_callback => abort_callback, :filename => filename, :autojoin => autojoin, :lockfile => lockfile)
  stream
end

#clearObject



38
39
40
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 38

def clear
  threads, pids, callback, abort_callback, joined = nil
end

#joinObject



79
80
81
82
83
84
85
86
87
88
89
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 79

def join
  @joined = true

  join_threads
  join_pids

  join_callback

  lockfile.unlock if lockfile and lockfile.locked?
  close unless closed?
end

#join_callbackObject



72
73
74
75
76
77
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 72

def join_callback
  if @callback and not joined?
    @callback.call
    @callback = nil
  end
end

#join_pidsObject



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 59

def join_pids
  if @pids and @pids.any?
    @pids.each do |pid| 
      begin
        Process.waitpid(pid, Process::WUNTRACED)
        raise ProcessFailed.new "Error joining process #{pid} in #{self.inspect}" unless $?.success? or no_fail
      rescue Errno::ECHILD
      end
    end 
    @pids = []
  end
end

#join_threadsObject



50
51
52
53
54
55
56
57
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 50

def join_threads
  if @threads and @threads.any?
    @threads.each do |t| 
      t.join unless t == Thread.current
    end
    @threads = []
  end
end

#joined?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 42

def joined?
  @joined
end

#super(*args) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/rbbt/util/misc/concurrent_stream.rb', line 149

def super(*args)
  if autojoin
    begin
      super(*args)
    rescue
      Log.exception $!
      self.abort
      self.join 
      raise $!
    ensure
      self.join if self.closed? or self.eof? 
    end
  else
    super(*args)
  end
end