Class: EventStore::HTTPClient::Subscriptions::Subscribe
- Inherits:
-
Object
- Object
- EventStore::HTTPClient::Subscriptions::Subscribe
- Defined in:
- lib/event_store_http_client/subscriptions/subscribe.rb
Instance Attribute Summary collapse
-
#handler ⇒ Object
Returns the value of attribute handler.
-
#request_string ⇒ Object
Returns the value of attribute request_string.
-
#starting_point ⇒ Object
Returns the value of attribute starting_point.
-
#stream ⇒ Object
Returns the value of attribute stream.
Class Method Summary collapse
Instance Method Summary collapse
- #! ⇒ Object
- #body_handler(body) ⇒ Object
- #handle_failure(status_code) ⇒ Object
- #handle_success(raw_body) ⇒ Object
-
#initialize(stream, starting_point) ⇒ Subscribe
constructor
A new instance of Subscribe.
- #make_request ⇒ Object
Constructor Details
#initialize(stream, starting_point) ⇒ Subscribe
Returns a new instance of Subscribe.
39 40 41 42 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 39 def initialize(stream, starting_point) @stream = stream @starting_point = starting_point end |
Instance Attribute Details
#handler ⇒ Object
Returns the value of attribute handler.
18 19 20 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 18 def handler @handler end |
#request_string ⇒ Object
Returns the value of attribute request_string.
17 18 19 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 17 def request_string @request_string end |
#starting_point ⇒ Object
Returns the value of attribute starting_point.
16 17 18 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 16 def starting_point @starting_point end |
#stream ⇒ Object
Returns the value of attribute stream.
15 16 17 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 15 def stream @stream end |
Class Method Details
.!(params) ⇒ Object
23 24 25 26 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 23 def self.!(params) instance = build(params) instance.! end |
.build(params) ⇒ Object
28 29 30 31 32 33 34 35 36 37 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 28 def self.build(params) starting_point = params[:starting_point] stream = params[:stream] handler = params[:handler] new(stream, starting_point).tap do |instance| handler.configure instance EventStore::HTTPClient::Client::Builder.configure instance Logger.configure instance end end |
Instance Method Details
#! ⇒ Object
44 45 46 47 48 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 44 def ! logger.info "Starting from #{starting_point}" @request_string = "/streams/#{stream}/#{starting_point}/forward/20" make_request end |
#body_handler(body) ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 72 def body_handler(body) if body.length > 0 handle_success(body) else handle_failure(:unknown) end end |
#handle_failure(status_code) ⇒ Object
99 100 101 102 103 104 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 99 def handle_failure(status_code) logger.error "There was an error (#{status_code}) with the subscription request. Retrying" Vertx.set_timer(rand(1000)+10) do make_request end end |
#handle_success(raw_body) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 81 def handle_success(raw_body) body = JSON.parse(raw_body.to_s) links = body['links'] body['entries'].reverse.map{|e| logger.trace "Executing handler for #{e} with #{handler.inspect}" ::Retry.!(->(attempt){ handler.!(e, attempt) #persist_successfully_handled_event(e['id']) }) } if previous_link = links.find{|link| link['relation'] == 'previous'} @request_string = previous_link['uri'] end make_request end |
#make_request ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/event_store_http_client/subscriptions/subscribe.rb', line 50 def make_request = "#{request_string}?embed=body" logger.debug request = client.get() do |resp| resp.body_handler = body_handler end request.put_header('Accept', 'application/vnd.eventstore.atom+json') request.put_header('ES-LongPoll', 15) request.exception_handler { |e| logger.error "Exception in request: #{e}" Vertx.set_timer(1_000) do make_request end } request.end end |