Class: EsReadModel::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/es_readmodel/subscriber.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, options) ⇒ Subscriber

Returns a new instance of Subscriber.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/es_readmodel/subscriber.rb', line 12

def initialize(app, options)
  @app = app
  @listener = options[:listener]
  url = "http://#{options[:es_host]}:#{options[:es_port]}"
  @status = {
    available: false,
    startedAt: Time.now,
    eventsReceived: 0,
    eventStore: {
      url: url,
      connected: true,
      disconnects: 0
    }
  }
  @connection = Connection.new(url, options[:es_username], options[:es_password])
  @reducer = options[:reducer]
  Thread.new { subscribe }
end

Instance Attribute Details

#statusObject (readonly)

Returns the value of attribute status.



10
11
12
# File 'lib/es_readmodel/subscriber.rb', line 10

def status
  @status
end

Instance Method Details

#call(env) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/es_readmodel/subscriber.rb', line 31

def call(env)
  @request = Rack::Request.new(env)
  if env['PATH_INFO'] == '/status'
    status, headers, body = json_response(200, @status)
  else
    env['readmodel.state'] = @state
    env['readmodel.available'] = @status[:available]
    env['readmodel.status'] = 'OK'
    status, headers, body = @app.call(env)
  end
  @listener.call({
    level:  'info',
    tag:    'http.request',
    msg:    "#{env['REQUEST_METHOD']} #{@request.fullpath}",
    status: status
  })
  [status, headers, body]
end