Class: EventSource::Iterator

Inherits:
Object
  • Object
show all
Includes:
Log::Dependency
Defined in:
lib/event_source/iterator.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batchObject

Returns the value of attribute batch.



8
9
10
# File 'lib/event_source/iterator.rb', line 8

def batch
  @batch
end

#batch_positionObject



10
11
12
# File 'lib/event_source/iterator.rb', line 10

def batch_position
  @batch_position ||= 0
end

#positionObject

Returns the value of attribute position.



7
8
9
# File 'lib/event_source/iterator.rb', line 7

def position
  @position
end

#stream_offsetObject



15
16
17
# File 'lib/event_source/iterator.rb', line 15

def stream_offset
  @stream_offset ||= (position || 0)
end

Class Method Details

.build(get, stream_name, position: nil, cycle: nil) ⇒ Object



22
23
24
25
26
27
# File 'lib/event_source/iterator.rb', line 22

def self.build(get, stream_name, position: nil, cycle: nil)
  new(get, stream_name).tap do |instance|
    instance.position = position
    Cycle.configure instance, cycle: cycle
  end
end

.configure(receiver, get, stream_name, attr_name: nil, position: nil, cycle: nil) ⇒ Object



29
30
31
32
33
# File 'lib/event_source/iterator.rb', line 29

def self.configure(receiver, get, stream_name, attr_name: nil, position: nil, cycle: nil)
  attr_name ||= :iterator
  instance = build(get, stream_name, position: position, cycle: cycle)
  receiver.public_send "#{attr_name}=", instance
end

Instance Method Details

#advance_positionsObject



95
96
97
98
99
100
# File 'lib/event_source/iterator.rb', line 95

def advance_positions
  logger.trace { "Advancing positions (Batch Position: #{batch_position}, Stream Offset: #{stream_offset})" }
  self.batch_position += 1
  self.stream_offset += 1
  logger.debug { "Advanced positions (Batch Position: #{batch_position}, Stream Offset: #{stream_offset})" }
end

#get_batchObject



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/event_source/iterator.rb', line 82

def get_batch
  logger.trace "Getting batch"

  batch = nil
  cycle.() do
    batch = get.(stream_name, position: stream_offset)
  end

  logger.debug { "Finished getting batch (Count: #{batch.length})" }

  batch
end

#nextObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/event_source/iterator.rb', line 35

def next
  logger.trace { "Getting next event data (Batch Length: #{batch.nil? ? '<none>' : batch.length}, Batch Position: #{batch_position}, Stream Offset: #{stream_offset})" }

  resupply(batch)

  event_data = batch[batch_position]

  logger.debug { "Finished getting next event data (Batch Length: #{batch.nil? ? '<none>' : batch.length}, Batch Position: #{batch_position}, Stream Offset: #{stream_offset})" }
  logger.debug(tags: [:data, :event_data]) { "Event Data: #{event_data.pretty_inspect}" }

  advance_positions

  event_data
end

#reset(batch) ⇒ Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/event_source/iterator.rb', line 71

def reset(batch)
  logger.trace { "Resetting batch" }

  self.batch = batch
  self.batch_position = 0

  logger.debug { "Reset batch" }
  logger.debug(tags: [:data, :batch]) { "Batch: #{batch.pretty_inspect}" }
  logger.debug(tags: [:data, :batch]) { "Batch Position: #{batch_position.inspect}" }
end

#resupply(batch) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/event_source/iterator.rb', line 50

def resupply(batch)
  logger.trace { "Resupplying batch" }

  if batch.nil?
    batch_log_text = "Batch: #{batch.inspect}"
  else
    batch_log_text = "Batch Length: #{batch.length}"
  end

  if batch.nil? || batch_position == batch.length
    logger.debug { "Current batch is depleted (#{batch_log_text}, Batch Position: #{batch_position})" }

    batch = get_batch
    reset(batch)
  else
    logger.debug { "Current batch not depleted (#{batch_log_text}, Batch Position: #{batch_position})" }
  end

  logger.debug { "Finished resupplying batch" }
end