Class: BBK::HTTP::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/http/publisher.rb

Defined Under Namespace

Classes: PublishError

Constant Summary collapse

PROTOCOLS =
%w[http https].freeze
CONTENT_TYPE_HEADER =
'Content-Type'.freeze
METHODS =
Faraday::Connection::METHODS

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger) ⇒ Publisher



30
31
32
33
34
# File 'lib/bbk/http/publisher.rb', line 30

def initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger)
  @domains = domains
  @default_connection_options = default_connection_options
  @logger = ActiveSupport::TaggedLogging.new(logger).tagged(self.class.name)
end

Instance Attribute Details

#domainsObject (readonly)

Returns the value of attribute domains.



28
29
30
# File 'lib/bbk/http/publisher.rb', line 28

def domains
  @domains
end

#loggerObject (readonly)

Returns the value of attribute logger.



28
29
30
# File 'lib/bbk/http/publisher.rb', line 28

def logger
  @logger
end

Instance Method Details

#closeObject



40
# File 'lib/bbk/http/publisher.rb', line 40

def close; end

#protocolsObject



36
37
38
# File 'lib/bbk/http/publisher.rb', line 36

def protocols
  PROTOCOLS
end

#publish(result) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/bbk/http/publisher.rb', line 42

def publish(result)
  logger.debug "Try publish dispatcher result #{result.inspect}"
  route = result.route
  result_domain = route.domain

  raise "Unsupported protocol #{route.scheme}" unless PROTOCOLS.include?(route.scheme)
  raise "Unknown domain #{result_domain}" unless domains.has?(result_domain)

  domain = domains[result_domain]
  route_info = domain.call(route)
  logger.debug "Route #{route.inspect} transformed to #{route_info.inspect}"
  headers = { **result.message.headers, **route_info.headers }.transform_values(&:to_s)
  raw_publish(
    route_info.uri,
    method:  route_info.method,
    options: route_info.options,
    body:    result.message.payload,
    headers: headers
  )
end

#raw_publish(uri, method:, options: {}, body: nil, headers: {}) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/bbk/http/publisher.rb', line 63

def raw_publish(uri, method:, options: {}, body: nil, headers: {})
  raise "Unsupported method(#{method.inspect})" unless METHODS.include?(method.to_sym)

  content_type = headers.fetch(CONTENT_TYPE_HEADER, Faraday::Request::Json::MIME_TYPE)
  payload = if content_type == Faraday::Request::Json::MIME_TYPE
    Oj.generate(body)
  else
    body
  end
  headers[CONTENT_TYPE_HEADER] = content_type
  options = @default_connection_options.merge(options)
  logger.debug("Connection options: #{options}") if options.present?
  response = Faraday.new(uri, **options.slice(Faraday::ConnectionOptions.members)).send(method.to_sym, '', payload, headers)
  Concurrent::Promises.resolvable_future.tap do |f|
    data = response.to_hash
    if response.success?
      f.fulfill(data)
    else
      logger.error "Get error response #{data.except(:body)}"
      f.reject(PublishError.new(data))
    end
  end
end