Class: Eurydice::ConcurrentColumnEnumerator
- Inherits:
-
ColumnEnumeratorBase
- Object
- ColumnEnumeratorBase
- Eurydice::ConcurrentColumnEnumerator
- Defined in:
- lib/eurydice/column_enumerator.rb
Instance Method Summary collapse
-
#initialize(*args) ⇒ ConcurrentColumnEnumerator
constructor
A new instance of ConcurrentColumnEnumerator.
- #next ⇒ Object
- #rewind ⇒ Object
Methods inherited from ColumnEnumeratorBase
Constructor Details
#initialize(*args) ⇒ ConcurrentColumnEnumerator
Returns a new instance of ConcurrentColumnEnumerator.
67 68 69 70 71 72 73 74 |
# File 'lib/eurydice/column_enumerator.rb', line 67 def initialize(*args) super @standard_enumerator = ColumnEnumerator.new(*args) @queue = ArrayBlockingQueue.new(@options[:max_column_count] * 2) @fetch_pool = Executors.new_single_thread_executor(DaemonThreadFactory.new(self.class.name)) @exhausted = true rewind end |
Instance Method Details
#next ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/eurydice/column_enumerator.rb', line 76 def next unless @running.get_and_set(true) @fetch_pool.execute do begin while @running.get pair = @standard_enumerator.next @queue.put(pair) end rescue StopIteration end @queue.put(:stop_iteration) end end raise StopIteration if @exhausted value = @queue.take if value == :stop_iteration @exhausted = true raise StopIteration end value end |
#rewind ⇒ Object
101 102 103 104 105 106 107 108 |
# File 'lib/eurydice/column_enumerator.rb', line 101 def rewind @running = AtomicBoolean.new(false) unless @exhausted until @queue.take == :stop_iteration; end end @standard_enumerator.rewind @exhausted = false end |