Module: MessageStore::Read::Iterator
Defined Under Namespace
Modules: Build, Configure
Classes: Substitute
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Instance Attribute Details
#batch ⇒ Object
Returns the value of attribute batch.
23
24
25
|
# File 'lib/message_store/read/iterator.rb', line 23
def batch
@batch
end
|
#batch_index ⇒ Object
25
26
27
|
# File 'lib/message_store/read/iterator.rb', line 25
def batch_index
@batch_index ||= 0
end
|
#starting_position ⇒ Object
Returns the value of attribute starting_position.
22
23
24
|
# File 'lib/message_store/read/iterator.rb', line 22
def starting_position
@starting_position
end
|
Class Method Details
.included(cls) ⇒ Object
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
# File 'lib/message_store/read/iterator.rb', line 4
def self.included(cls)
cls.class_exec do
include Dependency
include Initializer
include Virtual
include Log::Dependency
extend Build
extend Configure
dependency :get, Get
initializer :stream_name
abstract :last_position
end
end
|
Instance Method Details
#advance_batch_index ⇒ Object
62
63
64
65
66
|
# File 'lib/message_store/read/iterator.rb', line 62
def advance_batch_index
logger.trace { "Advancing batch index (Batch Index: #{batch_index})" }
self.batch_index += 1
logger.debug { "Advanced batch index (Batch Index: #{batch_index})" }
end
|
#batch_depleted? ⇒ Boolean
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/message_store/read/iterator.rb', line 68
def batch_depleted?
if batch.nil?
logger.debug { "Batch is depleted (Batch is nil)" }
return true
end
if batch.empty?
logger.debug { "Batch is depleted (Batch is empty)" }
return true
end
if batch_index == batch.length
logger.debug { "Batch is depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" }
return true
end
false
end
|
#get_batch ⇒ Object
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/message_store/read/iterator.rb', line 96
def get_batch
position = next_batch_starting_position
logger.trace "Getting batch (Position: #{position.inspect})"
batch = []
if position.nil? || position >= 0
batch = get.(stream_name, position: position)
end
logger.debug { "Finished getting batch (Count: #{batch.length}, Position: #{position.inspect})" }
batch
end
|
#next ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/message_store/read/iterator.rb', line 47
def next
logger.trace { "Getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }
resupply if batch_depleted?
message_data = batch[batch_index]
logger.debug(tags: [:data, :message_data]) { "Next message data: #{message_data.pretty_inspect}" }
logger.debug { "Done getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }
advance_batch_index
message_data
end
|
#next_batch_starting_position ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/message_store/read/iterator.rb', line 111
def next_batch_starting_position
if batch.nil?
logger.debug { "Batch is nil (Next batch starting position: #{starting_position.inspect})" }
return starting_position
end
previous_position = last_position
next_position = previous_position + 1
logger.debug { "End of batch (Next starting position: #{next_position}, Previous Position: #{previous_position})" }
next_position
end
|
#reset(batch) ⇒ Object
124
125
126
127
128
129
130
131
132
133
|
# File 'lib/message_store/read/iterator.rb', line 124
def reset(batch)
logger.trace { "Resetting batch" }
self.batch = batch
self.batch_index = 0
logger.debug(tags: [:data, :batch]) { "Batch set to: \n#{batch.pretty_inspect}" }
logger.debug(tags: [:data, :batch]) { "Batch position set to: #{batch_index.inspect}" }
logger.debug { "Done resetting batch" }
end
|
#resupply ⇒ Object
87
88
89
90
91
92
93
94
|
# File 'lib/message_store/read/iterator.rb', line 87
def resupply
logger.trace { "Resupplying batch (Current Batch Length: #{(batch &.length).inspect})" }
batch = get_batch
reset(batch)
logger.debug { "Batch resupplied (Next Batch Length: #{(batch &.length).inspect})" }
end
|