Module: Vx::Common::AMQP::Consumer::Publish

Included in:
ClassMethods
Defined in:
lib/vx/common/amqp/consumer/publish.rb

Instance Method Summary collapse

Instance Method Details

#publish(message, options = nil) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/vx/common/amqp/consumer/publish.rb', line 8

def publish(message, options = nil)
  session.open

  options ||= {}
  options[:routing_key]  = routing_key if routing_key && !options.key?(:routing_key)
  options[:headers]      = headers     if headers && !options.key?(:headers)
  options[:content_type] ||= content_type || config.content_type
  options[:message_id]   ||= SecureRandom.uuid

  x = declare_exchange

  instrumentation = {
    payload:     message,
    exchange:    x.name,
    consumer:    consumer_name,
    consumer_id: consumer_id,
    properties:  options,
  }

  with_pub_middlewares instrumentation do
    instrument("process_publishing.consumer.amqp", instrumentation) do
      m = serialize_message message, options[:content_type]
      x.publish m, options
    end
  end

  self
end

#serialize_message(message, content_type) ⇒ Object



37
38
39
# File 'lib/vx/common/amqp/consumer/publish.rb', line 37

def serialize_message(message, content_type)
  Common::AMQP::Formatter.pack(content_type, message)
end

#with_pub_middlewares(env, &block) ⇒ Object



41
42
43
# File 'lib/vx/common/amqp/consumer/publish.rb', line 41

def with_pub_middlewares(env, &block)
  Common::AMQP.config.builders[:pub].to_app(block).call(env)
end