Class: Estore::Commands::ReadForward
Instance Attribute Summary
Attributes included from Command
#uuid
Instance Method Summary
collapse
#error, #read
Methods included from Command
#handle, #promise, #register!, #reject!, #remove!, #write
Constructor Details
#initialize(connection, stream, from, batch_size = nil, &block) ⇒ ReadForward
Returns a new instance of ReadForward.
9
10
11
12
13
14
15
16
17
|
# File 'lib/estore/commands/reads/forward.rb', line 9
def initialize(connection, stream, from, batch_size = nil, &block)
super(connection)
@stream = stream
@from = from
@batch_size = batch_size || 1000
@block = block
@events = []
end
|
Instance Method Details
#batch_completed(response) ⇒ Object
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/estore/commands/reads/forward.rb', line 39
def batch_completed(response)
error = error(response)
if error
remove!
promise.reject ReadEventsError.new(error)
else
keep_reading(response)
end
end
|
#call ⇒ Object
19
20
21
22
23
|
# File 'lib/estore/commands/reads/forward.rb', line 19
def call
register!
read(@stream, @from, @batch_size)
promise
end
|
#keep_reading(response) ⇒ Object
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/estore/commands/reads/forward.rb', line 25
def keep_reading(response)
events = Array(response.events)
@from += events.size
read(@stream, @from, @batch_size) unless response.is_end_of_stream
@block ? @block.call(events) : @events.push(*events)
if response.is_end_of_stream
remove!
promise.fulfill(@block ? nil : @events)
end
end
|