Class: BBK::HTTP::Publisher
- Inherits:
-
Object
- Object
- BBK::HTTP::Publisher
- Defined in:
- lib/bbk/http/publisher.rb
Defined Under Namespace
Classes: PublishError
Constant Summary collapse
- PROTOCOLS =
%w[http https].freeze
- CONTENT_TYPE_HEADER =
'Content-Type'.freeze
- METHODS =
Faraday::Connection::METHODS
Instance Attribute Summary collapse
-
#domains ⇒ Object
readonly
Returns the value of attribute domains.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger) ⇒ Publisher
constructor
A new instance of Publisher.
- #protocols ⇒ Object
- #publish(result) ⇒ Object
- #raw_publish(uri, method:, options: {}, body: nil, headers: {}) ⇒ Object
Constructor Details
#initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger) ⇒ Publisher
31 32 33 34 35 |
# File 'lib/bbk/http/publisher.rb', line 31 def initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger) @domains = domains = @logger = ActiveSupport::TaggedLogging.new(logger).tagged(self.class.name) end |
Instance Attribute Details
#domains ⇒ Object (readonly)
Returns the value of attribute domains.
29 30 31 |
# File 'lib/bbk/http/publisher.rb', line 29 def domains @domains end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
29 30 31 |
# File 'lib/bbk/http/publisher.rb', line 29 def logger @logger end |
Instance Method Details
#close ⇒ Object
41 |
# File 'lib/bbk/http/publisher.rb', line 41 def close; end |
#protocols ⇒ Object
37 38 39 |
# File 'lib/bbk/http/publisher.rb', line 37 def protocols PROTOCOLS end |
#publish(result) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/bbk/http/publisher.rb', line 43 def publish(result) logger.debug "Try publish dispatcher result #{result.inspect}" route = result.route result_domain = route.domain raise "Unsupported protocol #{route.scheme}" unless PROTOCOLS.include?(route.scheme) raise "Unknown domain #{result_domain}" unless domains.has?(result_domain) domain = domains[result_domain] route_info = domain.call(route) logger.debug "Route #{route.inspect} transformed to #{route_info.inspect}" headers = { **result..headers, **route_info.headers }.transform_values(&:to_s) raw_publish( route_info.uri, method: route_info.method, options: route_info., body: result..payload, headers: headers ) end |
#raw_publish(uri, method:, options: {}, body: nil, headers: {}) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/bbk/http/publisher.rb', line 64 def raw_publish(uri, method:, options: {}, body: nil, headers: {}) raise "Unsupported method(#{method.inspect})" unless METHODS.include?(method.to_sym) content_type = headers.fetch(CONTENT_TYPE_HEADER, Faraday::Request::Json::MIME_TYPE) payload = if content_type == Faraday::Request::Json::MIME_TYPE Oj.generate(body) else body end headers[CONTENT_TYPE_HEADER] = content_type = .merge() logger.debug("Connection options: #{options}") if .present? response = Faraday.new(uri, **.slice(Faraday::ConnectionOptions.members)).send( method.to_sym, '', payload, headers ) Concurrent::Promises.resolvable_future.tap do |f| data = response.to_hash if response.success? f.fulfill(data) else logger.error "Get error response #{data.except(:body)}" f.reject(PublishError.new(data)) end end rescue Faraday::ConnectionFailed => e logger.error "Faraday connection failed error: #{e.inspect}" Concurrent::Promises.resolvable_future.reject(PublishError.new({ cause: e })) rescue Faraday::Error => e logger.error "Faraday error: #{e.inspect}" Concurrent::Promises.resolvable_future.reject(PublishError.new({ cause: e })) end |