Class: BBK::AMQP::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/amqp/publisher.rb

Overview

Publisher send amqp messages

Constant Summary collapse

HEADER_PROP_FIELDS =
i[message_id reply_to correlation_id].freeze
PROTOCOLS =
%w[mq amqp amqps].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, domains, logger: BBK::AMQP.logger) ⇒ Publisher

Returns a new instance of Publisher.



16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/bbk/amqp/publisher.rb', line 16

def initialize(connection, domains, logger: BBK::AMQP.logger)
  @connection = connection
  @channel = connection.channel
  @domains = domains

  logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger)
  @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, "Ch##{@channel.id}"])

  @ack_map = Concurrent::Map.new
  @sended_messages = Concurrent::Map.new
  @configured_exchanges = Set.new
  initialize_callbacks
end

Instance Attribute Details

#ack_mapObject (readonly)

Returns the value of attribute ack_map.



14
15
16
# File 'lib/bbk/amqp/publisher.rb', line 14

def ack_map
  @ack_map
end

#channelObject (readonly)

Returns the value of attribute channel.



14
15
16
# File 'lib/bbk/amqp/publisher.rb', line 14

def channel
  @channel
end

#connectionObject (readonly)

Returns the value of attribute connection.



14
15
16
# File 'lib/bbk/amqp/publisher.rb', line 14

def connection
  @connection
end

#domainsObject (readonly)

Returns the value of attribute domains.



14
15
16
# File 'lib/bbk/amqp/publisher.rb', line 14

def domains
  @domains
end

#loggerObject (readonly)

Returns the value of attribute logger.



14
15
16
# File 'lib/bbk/amqp/publisher.rb', line 14

def logger
  @logger
end

#sended_messagesObject (readonly)

Returns the value of attribute sended_messages.



14
15
16
# File 'lib/bbk/amqp/publisher.rb', line 14

def sended_messages
  @sended_messages
end

Instance Method Details

#closeObject

Close publisher - try close amqp channel



37
38
39
40
41
42
43
44
# File 'lib/bbk/amqp/publisher.rb', line 37

def close
  @channel.tap do |c|
    return nil unless c

    @channel = nil
    c.close
  end
end

#protocolsArray<Symbol>

Returned supported protocols list

Returns:

  • (Array<Symbol>)


32
33
34
# File 'lib/bbk/amqp/publisher.rb', line 32

def protocols
  PROTOCOLS
end

#publish(result) ⇒ Object

Publish dispatcher result

Parameters:

  • result (BBK::App::Dispatcher::Result)

    sended result

Raises:

  • (ArgumentError)


48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/bbk/amqp/publisher.rb', line 48

def publish(result)
  logger.debug "Try publish dispatcher result #{result.inspect}"
  route = result.route
  result_domain = route.domain
  raise "Unsupported protocol #{route.scheme}" unless PROTOCOLS.include?(route.scheme)
  raise "Unknown domain #{result_domain}" unless domains.has?(result_domain)

  domain = domains[result_domain]
  raise ArgumentError.new("Unknown route domain #{resutl_domain}") if domain.nil?

  route_info = domain.call(route)
  message = result.message
  publish_message(route_info.routing_key, message, exchange: route_info.exchange)
end

#publish_message(routing_key, message, exchange:, options: {}) ⇒ Object

Publish message

Parameters:

  • routing_key (String)

    message routing key

  • message (Object)

    (object with headers and payload method)

  • exchange (String)

    exchange for sending message

  • options (Hash) (defaults to: {})

    message properties



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/bbk/amqp/publisher.rb', line 68

def publish_message(routing_key, message, exchange:, options: {})
  logger.debug "Try publish message #{message.headers.inspect}"
  properties = {
    persistent:  true,
    mandatory:   true,
    routing_key: routing_key,
    headers:     message.headers,
    # user_id:     client_name,
    **message.headers.select {|k| HEADER_PROP_FIELDS.include?(k.to_sym) }.compact
  }.merge(options).symbolize_keys
  properties[:user_id] = client_name if message.headers[:user_id].blank?
  send_message(exchange, routing_key, message.payload, properties)
end

#raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {}) ⇒ Object

Publish raw payload

Parameters:

  • routing_key (String)

    routing key for sending data

  • exchange (String)

    exchange name

  • properties (Hash) (defaults to: {})

    amqp message properties

  • headers (Messag) (defaults to: {})


87
88
89
90
91
92
93
94
95
# File 'lib/bbk/amqp/publisher.rb', line 87

def raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {})
  logger.debug "Publish raw message #{headers.inspect}"
  properties = properties.deep_dup
  properties[:headers] = properties.fetch(:headers, {}).merge headers
  properties = properties.merge(headers.select do |k|
                                  HEADER_PROP_FIELDS.include? k
                                end.compact).symbolize_keys
  send_message(exchange, routing_key, payload, properties)
end