Class: Estore::Commands::ReadForward

Inherits:
Object
  • Object
show all
Includes:
Command, ReadStreamForward
Defined in:
lib/estore/commands/reads/forward.rb

Instance Attribute Summary

Attributes included from Command

#uuid

Instance Method Summary collapse

Methods included from ReadStreamForward

#error, #read

Methods included from Command

#handle, #promise, #register!, #reject!, #remove!, #write

Constructor Details

#initialize(connection, stream, from, batch_size = nil, &block) ⇒ ReadForward

Returns a new instance of ReadForward.



9
10
11
12
13
14
15
16
17
# File 'lib/estore/commands/reads/forward.rb', line 9

def initialize(connection, stream, from, batch_size = nil, &block)
  super(connection)

  @stream = stream
  @from = from
  @batch_size = batch_size || 1000
  @block = block
  @events = []
end

Instance Method Details

#batch_completed(response) ⇒ Object



39
40
41
42
43
44
45
46
47
48
# File 'lib/estore/commands/reads/forward.rb', line 39

def batch_completed(response)
  error = error(response)

  if error
    remove!
    promise.reject ReadEventsError.new(error)
  else
    keep_reading(response)
  end
end

#callObject



19
20
21
22
23
# File 'lib/estore/commands/reads/forward.rb', line 19

def call
  register!
  read(@stream, @from, @batch_size)
  promise
end

#keep_reading(response) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/estore/commands/reads/forward.rb', line 25

def keep_reading(response)
  events = Array(response.events)

  @from += events.size
  read(@stream, @from, @batch_size) unless response.is_end_of_stream

  @block ? @block.call(events) : @events.push(*events)

  if response.is_end_of_stream
    remove!
    promise.fulfill(@block ? nil : @events)
  end
end