Class: EventStore::HTTPClient::Subscriptions::Subscribe

Inherits:
Object
  • Object
show all
Defined in:
lib/event_store_http_client/subscriptions/subscribe.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stream, starting_point) ⇒ Subscribe

Returns a new instance of Subscribe.



39
40
41
42
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 39

def initialize(stream, starting_point)
  @stream = stream
  @starting_point = starting_point
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



18
19
20
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 18

def handler
  @handler
end

#request_stringObject

Returns the value of attribute request_string.



17
18
19
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 17

def request_string
  @request_string
end

#starting_pointObject

Returns the value of attribute starting_point.



16
17
18
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 16

def starting_point
  @starting_point
end

#streamObject

Returns the value of attribute stream.



15
16
17
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 15

def stream
  @stream
end

Class Method Details

.!(params) ⇒ Object



23
24
25
26
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 23

def self.!(params)
  instance = build(params)
  instance.!
end

.build(params) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 28

def self.build(params)
  starting_point = params[:starting_point]
  stream = params[:stream]
  handler = params[:handler]
  new(stream, starting_point).tap do |instance|
    handler.configure instance
    EventStore::HTTPClient::Client::Builder.configure instance
    Logger.configure instance
  end
end

Instance Method Details

#!Object



44
45
46
47
48
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 44

def !
  logger.info "Starting from #{starting_point}"
  @request_string = "/streams/#{stream}/#{starting_point}/forward/20"
  make_request
end

#body_handler(body) ⇒ Object



72
73
74
75
76
77
78
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 72

def body_handler(body)
  if body.length > 0
    handle_success(body)
  else
    handle_failure(:unknown)
  end
end

#handle_failure(status_code) ⇒ Object



99
100
101
102
103
104
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 99

def handle_failure(status_code)
  logger.error "There was an error (#{status_code}) with the subscription request.  Retrying"
  Vertx.set_timer(rand(1000)+10) do
    make_request
  end
end

#handle_success(raw_body) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 81

def handle_success(raw_body)
  body = JSON.parse(raw_body.to_s)
  links = body['links']

  body['entries'].reverse.map{|e|
    logger.trace "Executing handler for #{e} with #{handler.inspect}"
    ::Retry.!(->(attempt){
      handler.!(e, attempt)
      #persist_successfully_handled_event(e['id'])
    })
  }

  if previous_link = links.find{|link| link['relation'] == 'previous'}
    @request_string = previous_link['uri']
  end
  make_request
end

#make_requestObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 50

def make_request
  body_embed_link = "#{request_string}?embed=body"

  logger.debug body_embed_link

  request = client.get(body_embed_link) do |resp|
    resp.body_handler = body_handler
  end

  request.put_header('Accept', 'application/vnd.eventstore.atom+json')
  request.put_header('ES-LongPoll', 15)

  request.exception_handler { |e|
    logger.error "Exception in request: #{e}"
    Vertx.set_timer(1_000) do
      make_request
    end
  }

  request.end
end