Class: BBK::HTTP::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/http/publisher.rb

Defined Under Namespace

Classes: PublishError

Constant Summary collapse

PROTOCOLS =
%w[http https].freeze
CONTENT_TYPE_HEADER =
'Content-Type'.freeze
METHODS =
Faraday::Connection::METHODS

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger) ⇒ Publisher



31
32
33
34
35
# File 'lib/bbk/http/publisher.rb', line 31

def initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger)
  @domains = domains
  @default_connection_options = default_connection_options
  @logger = ActiveSupport::TaggedLogging.new(logger).tagged(self.class.name)
end

Instance Attribute Details

#domainsObject (readonly)

Returns the value of attribute domains.



29
30
31
# File 'lib/bbk/http/publisher.rb', line 29

def domains
  @domains
end

#loggerObject (readonly)

Returns the value of attribute logger.



29
30
31
# File 'lib/bbk/http/publisher.rb', line 29

def logger
  @logger
end

Instance Method Details

#closeObject



41
# File 'lib/bbk/http/publisher.rb', line 41

def close; end

#protocolsObject



37
38
39
# File 'lib/bbk/http/publisher.rb', line 37

def protocols
  PROTOCOLS
end

#publish(result) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/bbk/http/publisher.rb', line 43

def publish(result)
  logger.debug "Try publish dispatcher result #{result.inspect}"
  route = result.route
  result_domain = route.domain

  raise "Unsupported protocol #{route.scheme}" unless PROTOCOLS.include?(route.scheme)
  raise "Unknown domain #{result_domain}" unless domains.has?(result_domain)

  domain = domains[result_domain]
  route_info = domain.call(route)
  logger.debug "Route #{route.inspect} transformed to #{route_info.inspect}"
  headers = { **result.message.headers, **route_info.headers }.transform_values(&:to_s)
  raw_publish(
    route_info.uri,
    method:  route_info.method,
    options: route_info.options,
    body:    result.message.payload,
    headers: headers
  )
end

#raw_publish(uri, method:, options: {}, body: nil, headers: {}) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/bbk/http/publisher.rb', line 64

def raw_publish(uri, method:, options: {}, body: nil, headers: {})
  raise "Unsupported method(#{method.inspect})" unless METHODS.include?(method.to_sym)

  content_type = headers.fetch(CONTENT_TYPE_HEADER, Faraday::Request::Json::MIME_TYPE)
  payload = if content_type == Faraday::Request::Json::MIME_TYPE
    Oj.generate(body)
  else
    body
  end
  headers[CONTENT_TYPE_HEADER] = content_type
  options = @default_connection_options.merge(options)
  logger.debug("Connection options: #{options}") if options.present?
  response = Faraday.new(uri, **options.slice(Faraday::ConnectionOptions.members)).send(
    method.to_sym, '', payload, headers
  )
  Concurrent::Promises.resolvable_future.tap do |f|
    data = response.to_hash
    if response.success?
      f.fulfill(data)
    else
      logger.error "Get error response #{data.except(:body)}"
      f.reject(PublishError.new(data))
    end
  end
rescue Faraday::ConnectionFailed => e
  logger.error "Faraday connection failed error: #{e.inspect}"
  Concurrent::Promises.resolvable_future.reject(PublishError.new({ cause: e }))
rescue Faraday::Error => e
  logger.error "Faraday error: #{e.inspect}"
  Concurrent::Promises.resolvable_future.reject(PublishError.new({ cause: e }))
end