Class: EventSource::Iterator
- Inherits:
-
Object
- Object
- EventSource::Iterator
- Includes:
- Log::Dependency
- Defined in:
- lib/event_source/iterator.rb
Instance Attribute Summary collapse
-
#batch ⇒ Object
Returns the value of attribute batch.
- #batch_position ⇒ Object
-
#position ⇒ Object
Returns the value of attribute position.
- #stream_offset ⇒ Object
Class Method Summary collapse
- .build(get, stream_name, position: nil, cycle: nil) ⇒ Object
- .configure(receiver, get, stream_name, attr_name: nil, position: nil, cycle: nil) ⇒ Object
Instance Method Summary collapse
- #advance_positions ⇒ Object
- #get_batch ⇒ Object
- #next ⇒ Object
- #reset(batch) ⇒ Object
- #resupply(batch) ⇒ Object
Instance Attribute Details
#batch ⇒ Object
Returns the value of attribute batch.
8 9 10 |
# File 'lib/event_source/iterator.rb', line 8 def batch @batch end |
#batch_position ⇒ Object
10 11 12 |
# File 'lib/event_source/iterator.rb', line 10 def batch_position @batch_position ||= 0 end |
#position ⇒ Object
Returns the value of attribute position.
7 8 9 |
# File 'lib/event_source/iterator.rb', line 7 def position @position end |
#stream_offset ⇒ Object
15 16 17 |
# File 'lib/event_source/iterator.rb', line 15 def stream_offset @stream_offset ||= (position || 0) end |
Class Method Details
.build(get, stream_name, position: nil, cycle: nil) ⇒ Object
22 23 24 25 26 27 |
# File 'lib/event_source/iterator.rb', line 22 def self.build(get, stream_name, position: nil, cycle: nil) new(get, stream_name).tap do |instance| instance.position = position Cycle.configure instance, cycle: cycle end end |
.configure(receiver, get, stream_name, attr_name: nil, position: nil, cycle: nil) ⇒ Object
29 30 31 32 33 |
# File 'lib/event_source/iterator.rb', line 29 def self.configure(receiver, get, stream_name, attr_name: nil, position: nil, cycle: nil) attr_name ||= :iterator instance = build(get, stream_name, position: position, cycle: cycle) receiver.public_send "#{attr_name}=", instance end |
Instance Method Details
#advance_positions ⇒ Object
95 96 97 98 99 100 |
# File 'lib/event_source/iterator.rb', line 95 def advance_positions logger.trace { "Advancing positions (Batch Position: #{batch_position}, Stream Offset: #{stream_offset})" } self.batch_position += 1 self.stream_offset += 1 logger.debug { "Advanced positions (Batch Position: #{batch_position}, Stream Offset: #{stream_offset})" } end |
#get_batch ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/event_source/iterator.rb', line 82 def get_batch logger.trace "Getting batch" batch = nil cycle.() do batch = get.(stream_name, position: stream_offset) end logger.debug { "Finished getting batch (Count: #{batch.length})" } batch end |
#next ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/event_source/iterator.rb', line 35 def next logger.trace { "Getting next event data (Batch Length: #{batch.nil? ? '<none>' : batch.length}, Batch Position: #{batch_position}, Stream Offset: #{stream_offset})" } resupply(batch) event_data = batch[batch_position] logger.debug { "Finished getting next event data (Batch Length: #{batch.nil? ? '<none>' : batch.length}, Batch Position: #{batch_position}, Stream Offset: #{stream_offset})" } logger.debug(tags: [:data, :event_data]) { "Event Data: #{event_data.pretty_inspect}" } advance_positions event_data end |
#reset(batch) ⇒ Object
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/event_source/iterator.rb', line 71 def reset(batch) logger.trace { "Resetting batch" } self.batch = batch self.batch_position = 0 logger.debug { "Reset batch" } logger.debug(tags: [:data, :batch]) { "Batch: #{batch.pretty_inspect}" } logger.debug(tags: [:data, :batch]) { "Batch Position: #{batch_position.inspect}" } end |
#resupply(batch) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/event_source/iterator.rb', line 50 def resupply(batch) logger.trace { "Resupplying batch" } if batch.nil? batch_log_text = "Batch: #{batch.inspect}" else batch_log_text = "Batch Length: #{batch.length}" end if batch.nil? || batch_position == batch.length logger.debug { "Current batch is depleted (#{batch_log_text}, Batch Position: #{batch_position})" } batch = get_batch reset(batch) else logger.debug { "Current batch not depleted (#{batch_log_text}, Batch Position: #{batch_position})" } end logger.debug { "Finished resupplying batch" } end |