Class: Msgr::Connection

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/msgr/connection.rb

Overview

rubocop:disable Metrics/ClassLength

Constant Summary collapse

EXCHANGE_NAME =
'msgr'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log, #log_name

Constructor Details

#initialize(uri, config, dispatcher) ⇒ Connection

Returns a new instance of Connection.



14
15
16
17
18
# File 'lib/msgr/connection.rb', line 14

def initialize(uri, config, dispatcher)
  @uri        = uri
  @config     = config
  @dispatcher = dispatcher
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



12
13
14
# File 'lib/msgr/connection.rb', line 12

def config
  @config
end

#uriObject (readonly)

Returns the value of attribute uri.



12
13
14
# File 'lib/msgr/connection.rb', line 12

def uri
  @uri
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



112
113
114
115
# File 'lib/msgr/connection.rb', line 112

def ack(delivery_tag)
  channel.ack delivery_tag
  log(:debug) { "Acked message: #{delivery_tag}" }
end

#bind(routes) ⇒ Object



101
102
103
104
105
106
107
108
109
110
# File 'lib/msgr/connection.rb', line 101

def bind(routes)
  if routes.empty?
    log(:warn) do
      "No routes to bound to. Bind will have no effect:\n" \
      "  #{routes.inspect}"
    end
  else
    bind_all(routes)
  end
end

#bindingsObject



69
70
71
# File 'lib/msgr/connection.rb', line 69

def bindings
  @bindings ||= []
end

#channelObject



40
41
42
43
44
45
46
# File 'lib/msgr/connection.rb', line 40

def channel
  @channel ||= begin
    channel = connection.create_channel
    channel.prefetch 1
    channel
  end
end

#closeObject



127
128
129
130
131
# File 'lib/msgr/connection.rb', line 127

def close
  channel.close    if @channel && @channel.open?
  connection.close if @connection
  log(:debug) { 'Closed.' }
end

#connectObject



36
37
38
# File 'lib/msgr/connection.rb', line 36

def connect
  connection
end

#connectionObject



32
33
34
# File 'lib/msgr/connection.rb', line 32

def connection
  @connection ||= ::Bunny.new(config).tap(&:start)
end

#deleteObject



55
56
57
58
59
60
# File 'lib/msgr/connection.rb', line 55

def delete
  return if bindings.empty?
  log(:debug) { "Delete bindings (#{bindings.size})..." }

  bindings.each(&:delete)
end

#exchangeObject



81
82
83
84
85
86
87
88
89
90
# File 'lib/msgr/connection.rb', line 81

def exchange
  @exchange ||= begin
    channel.topic(prefix(EXCHANGE_NAME), durable: true).tap do |ex|
      log(:debug) do
        "Created exchange #{ex.name} (type: #{ex.type}, " \
            "durable: #{ex.durable?}, auto_delete: #{ex.auto_delete?})"
      end
    end
  end
end

#nack(delivery_tag) ⇒ Object



117
118
119
120
# File 'lib/msgr/connection.rb', line 117

def nack(delivery_tag)
  channel.nack delivery_tag, false, true
  log(:debug) { "Nacked message: #{delivery_tag}" }
end

#prefix(name) ⇒ Object



73
74
75
76
77
78
79
# File 'lib/msgr/connection.rb', line 73

def prefix(name)
  if config[:prefix].present?
    "#{config[:prefix]}-#{name}"
  else
    name
  end
end

#publish(message, opts = {}) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/msgr/connection.rb', line 24

def publish(message, opts = {})
  opts[:routing_key] = opts.delete(:to) if opts[:to]

  exchange.publish message.to_s, opts.merge(persistent: true)

  log(:debug) { "Published message to #{opts[:routing_key]}" }
end

#purge(**kwargs) ⇒ Object



62
63
64
65
66
67
# File 'lib/msgr/connection.rb', line 62

def purge(**kwargs)
  return if bindings.empty?
  log(:debug) { "Purge bindings (#{bindings.size})..." }

  bindings.each {|b| b.purge(**kwargs) }
end

#queue(name) ⇒ Object



92
93
94
95
96
97
98
99
# File 'lib/msgr/connection.rb', line 92

def queue(name)
  channel.queue(prefix(name), durable: true).tap do |queue|
    log(:debug) do
      "Create queue #{queue.name} (durable: #{queue.durable?}, " \
      "auto_delete: #{queue.auto_delete?})"
    end
  end
end

#reject(delivery_tag, requeue = true) ⇒ Object



122
123
124
125
# File 'lib/msgr/connection.rb', line 122

def reject(delivery_tag, requeue = true)
  channel.reject delivery_tag, requeue
  log(:debug) { "Rejected message: #{delivery_tag}" }
end

#releaseObject



48
49
50
51
52
53
# File 'lib/msgr/connection.rb', line 48

def release
  return if bindings.empty?
  log(:debug) { "Release bindings (#{bindings.size})..." }

  bindings.each(&:release)
end

#running?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/msgr/connection.rb', line 20

def running?
  bindings.any?
end