Class: Threadded_enumerator
- Inherits:
-
Object
- Object
- Threadded_enumerator
- Defined in:
- lib/threadded_enumerator.rb
Defined Under Namespace
Classes: Yielder
Constant Summary collapse
- @@threads =
Tsafe::MonHash.new
Class Method Summary collapse
-
.block_runner(args) ⇒ Object
Starts a thread which fills the yielder.
-
.finalizer(id) ⇒ Object
Kills waiting threads when ‘Threadded_enumerator’-objects are garbage-collected.
Instance Method Summary collapse
-
#each(&block) ⇒ Object
Loops over each result and yields it or returns an enumerator if no block is given.
-
#initialize(args = {}, &block) ⇒ Threadded_enumerator
constructor
A new instance of Threadded_enumerator.
-
#next ⇒ Object
Returns the next result.
Constructor Details
#initialize(args = {}, &block) ⇒ Threadded_enumerator
Returns a new instance of Threadded_enumerator.
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/threadded_enumerator.rb', line 70 def initialize(args = {}, &block) @args = { :cache => 0, :block => block, :id => self.__id__ }.merge(args) @debug = @args[:debug] @yielder = Threadded_enumerator::Yielder.new(@args) #We use this to kill the block-thread, execute any ensures and release references to any objects within. ObjectSpace.define_finalizer(self, Threadded_enumerator.method(:finalizer)) end |
Class Method Details
.block_runner(args) ⇒ Object
Starts a thread which fills the yielder. Its done by this method to allowed GC’ing of ‘Threadded_enumerator’-objects.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/threadded_enumerator.rb', line 42 def self.block_runner(args) @@threads[args[:id]] = Thread.new do args[:yielder].thread = Thread.current begin if args[:block] args[:block].call(args[:yielder]) args[:yielder].done = true elsif enum = args[:enum] begin loop do args[:yielder] << enum.next end rescue StopIteration #ignore. end else raise "Dont know what to do?" end rescue => e $stderr.puts e.inspect $stderr.puts e.backtrace ensure args[:yielder].done = true end end end |
.finalizer(id) ⇒ Object
Kills waiting threads when ‘Threadded_enumerator’-objects are garbage-collected. This makes ensures being executed, objects GC’ed and so on.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/threadded_enumerator.rb', line 8 def self.finalizer(id) begin Timeout.timeout(3) do thread = @@threads[id] #The thread is not always started, if the loop is never called... The thread might not exist for this reason. return nil if !thread #Remove reference. @@threads.delete(id) #Thread is already dead - ignore. return nil if !thread.alive? #Kill thread to release references to objects within and make it execute any ensures within. thread.kill #Check that the thread is actually killed by joining - else the timeout would have no effect, since 'kill' doesnt block. If the thread is never killed, this will be properly be a memory leak scenario, which we report in stderr! thread.join #This will make all sleeps and thread-stops be ignored. Commented out until further... Maybe this is good? #thread.run while thread.alive? end rescue Timeout::Error $stderr.puts "Couldnt kill thread #{id} for 'Threadded_enumerator' - possible memory leak detected!" rescue Exception => e $stderr.puts "Error while killing 'Threadded_enumerator'-thread." $stderr.puts e.inspect $stderr.puts e.backtrace raise e end end |
Instance Method Details
#each(&block) ⇒ Object
Loops over each result and yields it or returns an enumerator if no block is given.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/threadded_enumerator.rb', line 91 def each(&block) enum = Enumerator.new do |yielder| begin loop do next_res = self.next print "Nex: #{next_res}\n" if @debug yielder << next_res end rescue StopIteration STDOUT.print "StopIteration!\n" if @debug #ignore end print "Done?\n" if @debug end if block enum.each(&block) return nil else return enum end end |
#next ⇒ Object
Returns the next result.
85 86 87 88 |
# File 'lib/threadded_enumerator.rb', line 85 def next block_start if !@block_started return @yielder.get_result end |