Class: BBK::AMQP::Publisher
- Inherits:
-
Object
- Object
- BBK::AMQP::Publisher
- Defined in:
- lib/bbk/amqp/publisher.rb
Overview
Publisher send amqp messages
Constant Summary collapse
- HEADER_PROP_FIELDS =
i[ reply_to correlation_id].freeze
- PROTOCOLS =
%w[mq amqp amqps].freeze
Instance Attribute Summary collapse
-
#ack_map ⇒ Object
readonly
Returns the value of attribute ack_map.
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#domains ⇒ Object
readonly
Returns the value of attribute domains.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#sended_messages ⇒ Object
readonly
Returns the value of attribute sended_messages.
Instance Method Summary collapse
-
#close ⇒ Object
Close publisher - try close amqp channel.
-
#initialize(connection, domains, logger: BBK::AMQP.logger) ⇒ Publisher
constructor
A new instance of Publisher.
-
#protocols ⇒ Array<Symbol>
Returned supported protocols list.
-
#publish(result) ⇒ Object
Publish dispatcher result.
-
#publish_message(routing_key, message, exchange:, options: {}) ⇒ Object
Publish message.
-
#raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {}) ⇒ Object
Publish raw payload.
Constructor Details
#initialize(connection, domains, logger: BBK::AMQP.logger) ⇒ Publisher
Returns a new instance of Publisher.
16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/bbk/amqp/publisher.rb', line 16 def initialize(connection, domains, logger: BBK::AMQP.logger) @connection = connection @channel = connection.channel @domains = domains logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger) @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, "Ch##{@channel.id}"]) @ack_map = Concurrent::Map.new = Concurrent::Map.new @configured_exchanges = Set.new initialize_callbacks end |
Instance Attribute Details
#ack_map ⇒ Object (readonly)
Returns the value of attribute ack_map.
14 15 16 |
# File 'lib/bbk/amqp/publisher.rb', line 14 def ack_map @ack_map end |
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
14 15 16 |
# File 'lib/bbk/amqp/publisher.rb', line 14 def channel @channel end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
14 15 16 |
# File 'lib/bbk/amqp/publisher.rb', line 14 def connection @connection end |
#domains ⇒ Object (readonly)
Returns the value of attribute domains.
14 15 16 |
# File 'lib/bbk/amqp/publisher.rb', line 14 def domains @domains end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
14 15 16 |
# File 'lib/bbk/amqp/publisher.rb', line 14 def logger @logger end |
#sended_messages ⇒ Object (readonly)
Returns the value of attribute sended_messages.
14 15 16 |
# File 'lib/bbk/amqp/publisher.rb', line 14 def end |
Instance Method Details
#close ⇒ Object
Close publisher - try close amqp channel
37 38 39 40 41 42 43 44 |
# File 'lib/bbk/amqp/publisher.rb', line 37 def close @channel.tap do |c| return nil unless c @channel = nil c.close end end |
#protocols ⇒ Array<Symbol>
Returned supported protocols list
32 33 34 |
# File 'lib/bbk/amqp/publisher.rb', line 32 def protocols PROTOCOLS end |
#publish(result) ⇒ Object
Publish dispatcher result
48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/bbk/amqp/publisher.rb', line 48 def publish(result) logger.debug "Try publish dispatcher result #{result.inspect}" route = result.route result_domain = route.domain raise "Unsupported protocol #{route.scheme}" unless PROTOCOLS.include?(route.scheme) raise "Unknown domain #{result_domain}" unless domains.has?(result_domain) domain = domains[result_domain] raise ArgumentError.new("Unknown route domain #{resutl_domain}") if domain.nil? route_info = domain.call(route) = result. (route_info.routing_key, , exchange: route_info.exchange) end |
#publish_message(routing_key, message, exchange:, options: {}) ⇒ Object
Publish message
68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/bbk/amqp/publisher.rb', line 68 def (routing_key, , exchange:, options: {}) logger.debug "Try publish message #{message.headers.inspect}" properties = { persistent: true, mandatory: true, routing_key: routing_key, headers: .headers, # user_id: client_name, **.headers.select {|k| HEADER_PROP_FIELDS.include?(k.to_sym) }.compact }.merge().symbolize_keys properties[:user_id] = client_name if .headers[:user_id].blank? (exchange, routing_key, .payload, properties) end |
#raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {}) ⇒ Object
Publish raw payload
87 88 89 90 91 92 93 94 95 |
# File 'lib/bbk/amqp/publisher.rb', line 87 def raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {}) logger.debug "Publish raw message #{headers.inspect}" properties = properties.deep_dup properties[:headers] = properties.fetch(:headers, {}).merge headers properties = properties.merge(headers.select do |k| HEADER_PROP_FIELDS.include? k end.compact).symbolize_keys (exchange, routing_key, payload, properties) end |