Class: Msgr::Connection

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/msgr/connection.rb

Constant Summary collapse

EXCHANGE_NAME =
'msgr'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log, #log_name

Constructor Details

#initialize(uri, config, dispatcher) ⇒ Connection

Returns a new instance of Connection.



13
14
15
16
17
# File 'lib/msgr/connection.rb', line 13

def initialize(uri, config, dispatcher)
  @uri        = uri
  @config     = config
  @dispatcher = dispatcher
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



11
12
13
# File 'lib/msgr/connection.rb', line 11

def config
  @config
end

#uriObject (readonly)

Returns the value of attribute uri.



11
12
13
# File 'lib/msgr/connection.rb', line 11

def uri
  @uri
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



99
100
101
102
# File 'lib/msgr/connection.rb', line 99

def ack(delivery_tag)
  channel.ack delivery_tag
  log(:debug) { "Acked message: #{delivery_tag}" }
end

#bind(routes) ⇒ Object



91
92
93
94
95
96
97
# File 'lib/msgr/connection.rb', line 91

def bind(routes)
  if routes.empty?
    log(:warn) { "No routes to bound to. Bind will have no effect. (#{routes.inspect})" }
  else
    bind_all(routes)
  end
end

#bindingsObject



61
62
63
# File 'lib/msgr/connection.rb', line 61

def bindings
  @bindings ||= []
end

#channelObject



39
40
41
42
43
44
45
# File 'lib/msgr/connection.rb', line 39

def channel
  @channel ||= begin
    channel = connection.create_channel
    channel.prefetch 1
    channel
  end
end

#closeObject



114
115
116
117
118
# File 'lib/msgr/connection.rb', line 114

def close
  channel.close    if @channel && @channel.open?
  connection.close if @connection
  log(:debug) { 'Closed.' }
end

#connectObject



35
36
37
# File 'lib/msgr/connection.rb', line 35

def connect
  connection
end

#connectionObject



31
32
33
# File 'lib/msgr/connection.rb', line 31

def connection
  @connection ||= ::Bunny.new(config).tap { |b| b.start }
end

#deleteObject



54
55
56
57
58
59
# File 'lib/msgr/connection.rb', line 54

def delete
  return if bindings.empty?
  log(:debug) { "Delete bindings (#{bindings.size})..." }

  bindings.each { |binding| binding.delete }
end

#exchangeObject



73
74
75
76
77
78
79
80
# File 'lib/msgr/connection.rb', line 73

def exchange
  @exchange ||= channel.topic(prefix(EXCHANGE_NAME), durable: true).tap do |ex|
    log(:debug) do
      "Created exchange #{ex.name} (type: #{ex.type}, " \
          "durable: #{ex.durable?}, auto_delete: #{ex.auto_delete?})"
    end
  end
end

#nack(delivery_tag) ⇒ Object



104
105
106
107
# File 'lib/msgr/connection.rb', line 104

def nack(delivery_tag)
  channel.nack delivery_tag, false, true
  log(:debug) { "Nacked message: #{delivery_tag}" }
end

#prefix(name) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/msgr/connection.rb', line 65

def prefix(name)
  if config[:prefix].present?
    "#{config[:prefix]}-#{name}"
  else
    name
  end
end

#publish(message, opts = {}) ⇒ Object



23
24
25
26
27
28
29
# File 'lib/msgr/connection.rb', line 23

def publish(message, opts = {})
  opts[:routing_key] = opts.delete(:to) if opts[:to]

  exchange.publish message.to_s, opts.merge(persistent: true)

  log(:debug) { "Published message to #{opts[:routing_key]}" }
end

#queue(name) ⇒ Object



82
83
84
85
86
87
88
89
# File 'lib/msgr/connection.rb', line 82

def queue(name)
  channel.queue(prefix(name), durable: true).tap do |queue|
    log(:debug) do
      "Create queue #{queue.name} (durable: #{queue.durable?}, " \
      "auto_delete: #{queue.auto_delete?})"
    end
  end
end

#reject(delivery_tag, requeue = true) ⇒ Object



109
110
111
112
# File 'lib/msgr/connection.rb', line 109

def reject(delivery_tag, requeue = true)
  channel.reject delivery_tag, requeue
  log(:debug) { "Rejected message: #{delivery_tag}" }
end

#releaseObject



47
48
49
50
51
52
# File 'lib/msgr/connection.rb', line 47

def release
  return if bindings.empty?
  log(:debug) { "Release bindings (#{bindings.size})..." }

  bindings.each { |binding| binding.release }
end

#running?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/msgr/connection.rb', line 19

def running?
  bindings.any?
end