Class: Threadded_enumerator

Inherits:
Object
  • Object
show all
Defined in:
lib/threadded_enumerator.rb

Defined Under Namespace

Classes: Yielder

Constant Summary collapse

@@threads =
Tsafe::MonHash.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}, &block) ⇒ Threadded_enumerator

Returns a new instance of Threadded_enumerator.



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/threadded_enumerator.rb', line 70

def initialize(args = {}, &block)
  @args = {
    :cache => 0,
    :block => block,
    :id => self.__id__
  }.merge(args)
  
  @debug = @args[:debug]
  @yielder = Threadded_enumerator::Yielder.new(@args)
  
  #We use this to kill the block-thread, execute any ensures and release references to any objects within.
  ObjectSpace.define_finalizer(self, Threadded_enumerator.method(:finalizer))
end

Class Method Details

.block_runner(args) ⇒ Object

Starts a thread which fills the yielder. Its done by this method to allowed GC’ing of ‘Threadded_enumerator’-objects.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/threadded_enumerator.rb', line 42

def self.block_runner(args)
  @@threads[args[:id]] = Thread.new do
    args[:yielder].thread = Thread.current
    
    begin
      if args[:block]
        args[:block].call(args[:yielder])
        args[:yielder].done = true
      elsif enum = args[:enum]
        begin
          loop do
            args[:yielder] << enum.next
          end
        rescue StopIteration
          #ignore.
        end
      else
        raise "Dont know what to do?"
      end
    rescue => e
      $stderr.puts e.inspect
      $stderr.puts e.backtrace
    ensure
      args[:yielder].done = true
    end
  end
end

.finalizer(id) ⇒ Object

Kills waiting threads when ‘Threadded_enumerator’-objects are garbage-collected. This makes ensures being executed, objects GC’ed and so on.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/threadded_enumerator.rb', line 8

def self.finalizer(id)
  begin
    Timeout.timeout(3) do
      thread = @@threads[id]
      
      #The thread is not always started, if the loop is never called... The thread might not exist for this reason.
      return nil if !thread
      
      #Remove reference.
      @@threads.delete(id)
      
      #Thread is already dead - ignore.
      return nil if !thread.alive?
      
      #Kill thread to release references to objects within and make it execute any ensures within.
      thread.kill
      
      #Check that the thread is actually killed by joining - else the timeout would have no effect, since 'kill' doesnt block. If the thread is never killed, this will be properly be a memory leak scenario, which we report in stderr!
      thread.join
      
      #This will make all sleeps and thread-stops be ignored. Commented out until further... Maybe this is good?
      #thread.run while thread.alive?
    end
  rescue Timeout::Error
    $stderr.puts "Couldnt kill thread #{id} for 'Threadded_enumerator' - possible memory leak detected!"
  rescue Exception => e
    $stderr.puts "Error while killing 'Threadded_enumerator'-thread."
    $stderr.puts e.inspect
    $stderr.puts e.backtrace
    raise e
  end
end

Instance Method Details

#each(&block) ⇒ Object

Loops over each result and yields it or returns an enumerator if no block is given.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/threadded_enumerator.rb', line 91

def each(&block)
  enum = Enumerator.new do |yielder|
    begin
      loop do
        next_res = self.next
        print "Nex: #{next_res}\n" if @debug
        yielder << next_res
      end
    rescue StopIteration
      STDOUT.print "StopIteration!\n" if @debug
      #ignore
    end
    
    print "Done?\n" if @debug
  end
  
  if block
    enum.each(&block)
    return nil
  else
    return enum
  end
end

#nextObject

Returns the next result.



85
86
87
88
# File 'lib/threadded_enumerator.rb', line 85

def next
  block_start if !@block_started
  return @yielder.get_result
end