Class: Msgr::Connection

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/msgr/connection.rb

Constant Summary collapse

EXCHANGE_NAME =
'msgr'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log, #log_name

Constructor Details

#initialize(uri, config, dispatcher) ⇒ Connection

Returns a new instance of Connection.



13
14
15
16
17
# File 'lib/msgr/connection.rb', line 13

def initialize(uri, config, dispatcher)
  @uri        = uri
  @config     = config
  @dispatcher = dispatcher
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



11
12
13
# File 'lib/msgr/connection.rb', line 11

def config
  @config
end

#uriObject (readonly)

Returns the value of attribute uri.



11
12
13
# File 'lib/msgr/connection.rb', line 11

def uri
  @uri
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



101
102
103
104
# File 'lib/msgr/connection.rb', line 101

def ack(delivery_tag)
  channel.ack delivery_tag
  log(:debug) { "Acked message: #{delivery_tag}" }
end

#bind(routes) ⇒ Object



93
94
95
96
97
98
99
# File 'lib/msgr/connection.rb', line 93

def bind(routes)
  if routes.empty?
    log(:warn) { "No routes to bound to. Bind will have no effect. (#{routes.inspect})" }
  else
    bind_all(routes)
  end
end

#bindingsObject



63
64
65
# File 'lib/msgr/connection.rb', line 63

def bindings
  @bindings ||= []
end

#channelObject



44
45
46
# File 'lib/msgr/connection.rb', line 44

def channel
  @channel ||= connection.create_channel
end

#closeObject



111
112
113
114
115
# File 'lib/msgr/connection.rb', line 111

def close
  channel.close    if @channel && @channel.open?
  connection.close if @connection
  log(:debug) { 'Closed.' }
end

#connectObject



40
41
42
# File 'lib/msgr/connection.rb', line 40

def connect
  connection
end

#connectionObject



36
37
38
# File 'lib/msgr/connection.rb', line 36

def connection
  @connection ||= ::Bunny.new(config).tap { |b| b.start }
end

#deleteObject



55
56
57
58
59
60
61
# File 'lib/msgr/connection.rb', line 55

def delete
  return if bindings.empty?
  log(:debug) { "Delete bindings (#{bindings.size})..." }

  bindings.each { |binding| binding.delete }
  exchange.delete
end

#exchangeObject



75
76
77
78
79
80
81
82
# File 'lib/msgr/connection.rb', line 75

def exchange
  @exchange ||= channel.topic(prefix(EXCHANGE_NAME), durable: true).tap do |ex|
    log(:debug) do
      "Created exchange #{ex.name} (type: #{ex.type}, " \
          "durable: #{ex.durable?}, auto_delete: #{ex.auto_delete?})"
    end
  end
end

#prefix(name) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/msgr/connection.rb', line 67

def prefix(name)
  if config[:prefix].present?
    "#{config[:prefix]}-#{name}"
  else
    name
  end
end

#publish(payload, opts = {}) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/msgr/connection.rb', line 23

def publish(payload, opts = {})
  opts[:routing_key] = opts.delete(:to) if opts[:to]

  begin
    payload = MultiJson.dump(payload)
    exchange.publish payload, opts.merge(persistent: true, content_type: 'application/json')
  rescue => error
    exchange.publish payload.to_s, opts.merge(persistent: true, content_type: 'application/text')
  end

  log(:debug) { "Published message to #{opts[:routing_key]}" }
end

#queue(name) ⇒ Object



84
85
86
87
88
89
90
91
# File 'lib/msgr/connection.rb', line 84

def queue(name)
  channel.queue(prefix(name), durable: true).tap do |queue|
    log(:debug) do
      "Create queue #{queue.name} (durable: #{queue.durable?}, " \
      "auto_delete: #{queue.auto_delete?})"
    end
  end
end

#reject(delivery_tag, requeue = true) ⇒ Object



106
107
108
109
# File 'lib/msgr/connection.rb', line 106

def reject(delivery_tag, requeue = true)
  channel.reject delivery_tag, requeue
  log(:debug) { "Rejected message: #{delivery_tag}" }
end

#releaseObject



48
49
50
51
52
53
# File 'lib/msgr/connection.rb', line 48

def release
  return if bindings.empty?
  log(:debug) { "Release bindings (#{bindings.size})..." }

  bindings.each { |binding| binding.release }
end

#running?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/msgr/connection.rb', line 19

def running?
  bindings.any?
end