Module: BBK::AMQP::Utils

Defined in:
lib/bbk/amqp/utils.rb

Class Method Summary collapse

Class Method Details

.commonname(cert_path) ⇒ String

Extract CN certificate attribute from certificate path

Parameters:

  • cert_path (String)

    path to certificate file

Returns:

  • (String)

    certificate CN attribute value



43
44
45
46
# File 'lib/bbk/amqp/utils.rb', line 43

def self.commonname(cert_path)
  cert = OpenSSL::X509::Certificate.new(File.read(cert_path))
  cert.subject.to_a.find {|name, _, _| name == 'CN' }[1]
end

.create_connection(options = {}) ⇒ Bunny::Session

Set default options and create non started connection to amqp

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :hosts (String)

    List of Amqp hosts (default MQ_HOST env variable or mq)

  • :hostname (String)

    Amqp host (default MQ_HOST env variable or mq)

  • :port (Integer)

    Amqp port (default MQ_PORT env variable or 5671 - default tls port)

  • :vhost (String)

    Connected amqp virtual host (default MQ_VHOST env variable or /)

  • :tls (Boolean)

    Use tls (default true)

  • :tls_cert (String)

    Path to certificate file (default config/keys/cert.pem)

  • :tls_key (String)

    Path to key file (default config/keys/key.pem)

  • :tls_ca_certificates (Array)

    List to ca certificates (default config/keys/cacert.pem)

  • :verify (String)

    Verification option server certificate *

  • :verify_peer (String)

    Verification option server certificate *

  • :verify_ssl (String)

    Verification option server certificate *

  • :auth_mechanism (String)

    Amqp authorization mechanism (default EXTERNAL)

  • :automatically_recover (Boolean)

    Allow automatic network failure recovery (default false)

  • :automatic_recovery (Boolean)

    Alias for automatically_recover (default false)

  • :recovery_attempts (Integer)

    Limits the number of connection recovery attempts performed by Bunny (default 0, nil - unlimited)

  • :recover_from_connection_close (Boolean) — default: default false

Returns:

  • (Bunny::Session)

    non started amqp connection



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/bbk/amqp/utils.rb', line 66

def self.create_connection(options = {})
  hosts = [options[:hosts] || options[:host] || options[:hostname]].flatten.select(&:present?).uniq
  hosts = hosts.map{|h| h.split(/[;|]/) }.flatten.select(&:present?).uniq

  options[:hosts] = if hosts.empty?
    [ENV.fetch('MQ_HOST', 'mq')].split(/[;|]/).flatten.select(&:present?).uniq
  else
    hosts
  end

  options[:port] ||= ENV['MQ_PORT'] || 5671
  options[:vhost] ||= ENV['MQ_VHOST'] || '/'
  user = options[:username] || options[:user] || ENV['MQ_USER']
  options[:username] = options[:user] = user

  # Передаем пустую строку чтобы bunny не использовал пароль по умолчанию guest
  pwd = options[:password] || options[:pass] || options[:pwd] || ENV['MQ_PASS'] || ''
  options[:password] = options[:pass] = options[:pwd] = pwd

  options[:tls] = options.fetch(:tls, true)
  options[:tls_cert] ||= 'config/keys/cert.pem'
  options[:tls_key] ||= 'config/keys/key.pem'
  options[:tls_ca_certificates] ||= ['config/keys/cacert.pem']

  options[:verify] =
    options.fetch(:verify, options.fetch(:verify_peer, options.fetch(:verify_ssl, nil)))
  options[:verify] = true if options[:verify]
  options[:verify_peer] = options[:verify]
  options[:verify_ssl] = options[:verify]

  options[:auth_mechanism] ||= if options[:tls]
    'EXTERNAL'
  else
    'PLAIN'
  end

  options[:automatically_recover] ||= false
  options[:automatic_recovery]    ||= false
  options[:recovery_attempts]     ||= 0
  options[:recover_attempts] = options[:recovery_attempts]
  options[:recover_from_connection_close] ||= false
  Bunny.new(options)
end

.pop(queue, timeout = 10) ⇒ Array

Try get message from amqp queue

Parameters:

  • queue (Bunny::Queue)
  • timeout (Integer) (defaults to: 10)

    in seconds for waiting message message in queue

Returns:

  • (Array)

    array with delivery_info, metadata and payload

Raises:

  • (Timeout::Error)

    if queue empty in timeout time duration



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/bbk/amqp/utils.rb', line 15

def self.pop(queue, timeout = 10)
  unblocker = Queue.new
  consumer = queue.subscribe(block: false, manual_ack: true) do |delivery_info, , payload|
    message = [
      delivery_info,
      .to_hash.with_indifferent_access,
      begin
        Oj.load(payload).with_indifferent_access
      rescue StandardError
        payload
      end
    ]
    unblocker << message
  end
  Thread.new do
    sleep timeout
    unblocker << :timeout
  end
  result = unblocker.pop
  consumer.cancel
  raise ::Timeout::Error if result == :timeout
  queue.channel.ack(result[0].delivery_tag)
  result
end