Module: MessageStore::Read::Iterator

Included in:
Controls::Iterator::Example, Substitute
Defined in:
lib/message_store/read/iterator.rb

Defined Under Namespace

Modules: Build, Configure Classes: Substitute

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batchObject

Returns the value of attribute batch.



23
24
25
# File 'lib/message_store/read/iterator.rb', line 23

def batch
  @batch
end

#batch_indexObject



25
26
27
# File 'lib/message_store/read/iterator.rb', line 25

def batch_index
  @batch_index ||= 0
end

#starting_positionObject

Returns the value of attribute starting_position.



22
23
24
# File 'lib/message_store/read/iterator.rb', line 22

def starting_position
  @starting_position
end

Class Method Details

.included(cls) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/message_store/read/iterator.rb', line 4

def self.included(cls)
  cls.class_exec do
    include Dependency
    include Initializer
    include Virtual
    include Log::Dependency

    extend Build
    extend Configure

    dependency :get, Get

    initializer :stream_name

    abstract :last_position
  end
end

Instance Method Details

#advance_batch_indexObject



62
63
64
65
66
# File 'lib/message_store/read/iterator.rb', line 62

def advance_batch_index
  logger.trace { "Advancing batch index (Batch Index: #{batch_index})" }
  self.batch_index += 1
  logger.debug { "Advanced batch index (Batch Index: #{batch_index})" }
end

#batch_depleted?Boolean

Returns:

  • (Boolean)


68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/message_store/read/iterator.rb', line 68

def batch_depleted?
  if batch.nil?
    logger.debug { "Batch is depleted (Batch is nil)" }
    return true
  end

  if batch.empty?
    logger.debug { "Batch is depleted (Batch is empty)" }
    return true
  end

  if batch_index == batch.length
    logger.debug { "Batch is depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" }
    return true
  end

  false
end

#get_batchObject



96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/message_store/read/iterator.rb', line 96

def get_batch
  position = next_batch_starting_position

  logger.trace "Getting batch (Position: #{position.inspect})"

  batch = []
  if position.nil? || position >= 0
    batch = get.(stream_name, position: position)
  end

  logger.debug { "Finished getting batch (Count: #{batch.length}, Position: #{position.inspect})" }

  batch
end

#nextObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/message_store/read/iterator.rb', line 47

def next
  logger.trace { "Getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }

  resupply if batch_depleted?

  message_data = batch[batch_index]

  logger.debug(tags: [:data, :message_data]) { "Next message data: #{message_data.pretty_inspect}" }
  logger.debug { "Done getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }

  advance_batch_index

  message_data
end

#next_batch_starting_positionObject



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/message_store/read/iterator.rb', line 111

def next_batch_starting_position
  if batch.nil?
    logger.debug { "Batch is nil (Next batch starting position: #{starting_position.inspect})" }
    return starting_position
  end

  previous_position = last_position
  next_position = previous_position + 1
  logger.debug { "End of batch (Next starting position: #{next_position}, Previous Position: #{previous_position})" }

  next_position
end

#reset(batch) ⇒ Object



124
125
126
127
128
129
130
131
132
133
# File 'lib/message_store/read/iterator.rb', line 124

def reset(batch)
  logger.trace { "Resetting batch" }

  self.batch = batch
  self.batch_index = 0

  logger.debug(tags: [:data, :batch]) { "Batch set to: \n#{batch.pretty_inspect}" }
  logger.debug(tags: [:data, :batch]) { "Batch position set to: #{batch_index.inspect}" }
  logger.debug { "Done resetting batch" }
end

#resupplyObject



87
88
89
90
91
92
93
94
# File 'lib/message_store/read/iterator.rb', line 87

def resupply
  logger.trace { "Resupplying batch (Current Batch Length: #{(batch &.length).inspect})" }

  batch = get_batch
  reset(batch)

  logger.debug { "Batch resupplied (Next Batch Length: #{(batch &.length).inspect})" }
end