Module: MessageStore::Read::Iterator
Defined Under Namespace
Modules: Build, Configure
Classes: Substitute
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Instance Attribute Details
#batch ⇒ Object
Returns the value of attribute batch.
20
21
22
|
# File 'lib/message_store/read/iterator.rb', line 20
def batch
@batch
end
|
#batch_index ⇒ Object
22
23
24
|
# File 'lib/message_store/read/iterator.rb', line 22
def batch_index
@batch_index ||= 0
end
|
#starting_position ⇒ Object
Returns the value of attribute starting_position.
19
20
21
|
# File 'lib/message_store/read/iterator.rb', line 19
def starting_position
@starting_position
end
|
Class Method Details
.included(cls) ⇒ Object
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# File 'lib/message_store/read/iterator.rb', line 4
def self.included(cls)
cls.class_exec do
cls.extend Build
cls.extend Configure
include Log::Dependency
dependency :get, Get
initializer :stream_name
abstract :last_position
end
end
|
Instance Method Details
#advance_batch_index ⇒ Object
59
60
61
62
63
|
# File 'lib/message_store/read/iterator.rb', line 59
def advance_batch_index
logger.trace { "Advancing batch index (Batch Index: #{batch_index})" }
self.batch_index += 1
logger.debug { "Advanced batch index (Batch Index: #{batch_index})" }
end
|
#batch_depleted? ⇒ Boolean
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/message_store/read/iterator.rb', line 65
def batch_depleted?
if batch.nil?
logger.debug { "Batch is depleted (Batch is nil)" }
return true
end
if batch.empty?
logger.debug { "Batch is depleted (Batch is empty)" }
return true
end
if batch_index == batch.length
logger.debug { "Batch is depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" }
return true
end
false
end
|
#get_batch ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/message_store/read/iterator.rb', line 93
def get_batch
position = next_batch_starting_position
logger.trace "Getting batch (Position: #{position.inspect})"
batch = []
if position.nil? || position >= 0
batch = get.(stream_name, position: position)
end
logger.debug { "Finished getting batch (Count: #{batch.length}, Position: #{position.inspect})" }
batch
end
|
#next ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/message_store/read/iterator.rb', line 44
def next
logger.trace { "Getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }
resupply if batch_depleted?
message_data = batch[batch_index]
logger.debug(tags: [:data, :message_data]) { "Next message data: #{message_data.pretty_inspect}" }
logger.debug { "Done getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }
advance_batch_index
message_data
end
|
#next_batch_starting_position ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/message_store/read/iterator.rb', line 108
def next_batch_starting_position
if batch.nil?
logger.debug { "Batch is nil (Next batch starting position: #{starting_position.inspect})" }
return starting_position
end
previous_position = last_position
next_position = previous_position + 1
logger.debug { "End of batch (Next starting position: #{next_position}, Previous Position: #{previous_position})" }
next_position
end
|
#reset(batch) ⇒ Object
121
122
123
124
125
126
127
128
129
130
|
# File 'lib/message_store/read/iterator.rb', line 121
def reset(batch)
logger.trace { "Resetting batch" }
self.batch = batch
self.batch_index = 0
logger.debug(tags: [:data, :batch]) { "Batch set to: \n#{batch.pretty_inspect}" }
logger.debug(tags: [:data, :batch]) { "Batch position set to: #{batch_index.inspect}" }
logger.debug { "Done resetting batch" }
end
|
#resupply ⇒ Object
84
85
86
87
88
89
90
91
|
# File 'lib/message_store/read/iterator.rb', line 84
def resupply
logger.trace { "Resupplying batch (Current Batch Length: #{(batch &.length).inspect})" }
batch = get_batch
reset(batch)
logger.debug { "Batch resupplied (Next Batch Length: #{(batch &.length).inspect})" }
end
|