Class: Estore::Commands::CatchUpSubscription

Inherits:
Object
  • Object
show all
Includes:
Command, Subscription
Defined in:
lib/estore/commands/subscriptions/catch_up.rb

Instance Attribute Summary

Attributes included from Command

#uuid

Instance Method Summary collapse

Methods included from Subscription

#call, #close, #enqueue, #on_event, #worker_loop

Methods included from Command

#handle, #promise, #register!, #reject!, #remove!, #write

Constructor Details

#initialize(connection, stream, from, options = {}) ⇒ CatchUpSubscription

Returns a new instance of CatchUpSubscription.



10
11
12
13
14
15
16
17
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 10

def initialize(connection, stream, from, options = {})
  super(connection, stream, options)
  @from = from
  @batch = options[:batch_size]
  @mutex = Mutex.new
  @while_catching_up = []
  @caught_up = false
end

Instance Method Details

#event_appeared(response) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 42

def event_appeared(response)
  unless @caught_up
    @mutex.synchronize do
      @while_catching_up << response.event unless @caught_up
    end
  end

  enqueue response.event if @caught_up
end

#startObject



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 19

def start
  super

  # TODO: Think about doing something more clever?
  read = ReadForward.new(@connection, @stream, @from, @batch) do |events|
    enqueue events unless events.empty?
  end

  read.call.sync
  switch_to_live
end

#switch_to_liveObject



31
32
33
34
35
36
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 31

def switch_to_live
  @mutex.synchronize do
    unprocessed_events.each { |event| enqueue event }
    @caught_up = true
  end
end

#unprocessed_eventsObject



38
39
40
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 38

def unprocessed_events
  @while_catching_up.find_all { |event| event.original_event_number > @position }
end