Class: Estore::Commands::CatchUpSubscription
- Inherits:
-
Object
- Object
- Estore::Commands::CatchUpSubscription
show all
- Includes:
- Command, Subscription
- Defined in:
- lib/estore/commands/subscriptions/catch_up.rb
Instance Attribute Summary
Attributes included from Command
#uuid
Instance Method Summary
collapse
#call, #close, #enqueue, #on_event, #worker_loop
Methods included from Command
#handle, #promise, #register!, #reject!, #remove!, #write
Constructor Details
#initialize(connection, stream, from, options = {}) ⇒ CatchUpSubscription
Returns a new instance of CatchUpSubscription.
10
11
12
13
14
15
16
17
|
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 10
def initialize(connection, stream, from, options = {})
super(connection, stream, options)
@from = from
@batch = options[:batch_size]
@mutex = Mutex.new
@while_catching_up = []
@caught_up = false
end
|
Instance Method Details
#event_appeared(response) ⇒ Object
42
43
44
45
46
47
48
49
50
|
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 42
def event_appeared(response)
unless @caught_up
@mutex.synchronize do
@while_catching_up << response.event unless @caught_up
end
end
enqueue response.event if @caught_up
end
|
#start ⇒ Object
19
20
21
22
23
24
25
26
27
28
29
|
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 19
def start
super
read = ReadForward.new(@connection, @stream, @from, @batch) do |events|
enqueue events unless events.empty?
end
read.call.sync
switch_to_live
end
|
#switch_to_live ⇒ Object
31
32
33
34
35
36
|
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 31
def switch_to_live
@mutex.synchronize do
unprocessed_events.each { |event| enqueue event }
@caught_up = true
end
end
|
#unprocessed_events ⇒ Object
38
39
40
|
# File 'lib/estore/commands/subscriptions/catch_up.rb', line 38
def unprocessed_events
@while_catching_up.find_all { |event| event.original_event_number > @position }
end
|