Class: EsReadModel::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/es_readmodel/subscriber.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Subscriber

Returns a new instance of Subscriber.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/es_readmodel/subscriber.rb', line 10

def initialize(options)
  @listener = options[:listener]
  @initial_state = options[:initial]
  url = options[:es_url]
  @status = {
    available: false,
    startedAt: Time.now,
    eventsReceived: 0,
    eventStore: {
      url: url,
      connected: true,
      disconnects: 0
    }
  }
  @connection = Connection.new(url, options[:es_username], options[:es_password])
  @reducer = options[:reducer]
end

Instance Attribute Details

#stateObject (readonly)

Returns the value of attribute state.



8
9
10
# File 'lib/es_readmodel/subscriber.rb', line 8

def state
  @state
end

#statusObject (readonly)

Returns the value of attribute status.



8
9
10
# File 'lib/es_readmodel/subscriber.rb', line 8

def status
  @status
end

Instance Method Details

#subscribeObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/es_readmodel/subscriber.rb', line 28

def subscribe
  loop do
    begin
      @status[:available] = false
      @status[:eventStore][:connected] = false
      @state = @initial_state
      @stream = Stream.open("$all", @connection, @listener)
      @status[:eventStore][:connected] = true
      @status[:eventStore][:lastConnect] = Time.now
      subscribe_to_all_events
    rescue Exception => ex
      @listener.call({
        level: 'error',
        tag:   'connection.error',
        msg:   "#{ex.class}: #{ex.message}"
      })
      @status[:eventStore][:disconnects] = @status[:eventStore][:disconnects] + 1
      @status[:eventStore][:lastDisconnect] = Time.now
    end
  end
end