Module: Estore::Commands::Subscription

Included in:
CatchUpSubscription, LiveSubscription
Defined in:
lib/estore/commands/subscription.rb

Instance Method Summary collapse

Instance Method Details

#callObject



13
14
15
# File 'lib/estore/commands/subscription.rb', line 13

def call
  start
end

#closeObject



29
30
31
32
# File 'lib/estore/commands/subscription.rb', line 29

def close
  write('UnsubscribeFromStream', UnsubscribeFromStream.new)
  remove!
end

#enqueue(events) ⇒ Object



38
39
40
41
42
# File 'lib/estore/commands/subscription.rb', line 38

def enqueue(events)
  events = Array(events)
  @position = events.last.original_event_number
  @worker_queue << events
end

#initialize(connection, stream, options = {}) ⇒ Object



4
5
6
7
8
9
10
11
# File 'lib/estore/commands/subscription.rb', line 4

def initialize(connection, stream, options = {})
  super(connection)
  @has_finished = false
  @stream = stream
  @resolve_link_tos = options.fetch(:resolve_link_tos, true)
  @worker_queue = Queue.new
  @worker = Thread.new { loop { worker_loop } }
end

#on_event(&block) ⇒ Object



34
35
36
# File 'lib/estore/commands/subscription.rb', line 34

def on_event(&block)
  @handler = block
end

#startObject



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/estore/commands/subscription.rb', line 17

def start
  raise 'Subscription block not defined' unless @handler

  msg = SubscribeToStream.new(
    event_stream_id: @stream,
    resolve_link_tos: @resolve_link_tos
  )

  register!
  write('SubscribeToStream', msg)
end

#worker_loopObject



44
45
46
47
48
49
# File 'lib/estore/commands/subscription.rb', line 44

def worker_loop
  @worker_queue.pop.each { |event| @handler.call(event) }
rescue => e
  puts e.message
  puts e.backtrace
end