Class: Msgr::Connection

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/msgr/connection.rb

Constant Summary collapse

EXCHANGE_NAME =
'msgr'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log, #log_name

Constructor Details

#initialize(uri, config, dispatcher) ⇒ Connection

Returns a new instance of Connection.



13
14
15
16
17
# File 'lib/msgr/connection.rb', line 13

def initialize(uri, config, dispatcher)
  @uri        = uri
  @config     = config
  @dispatcher = dispatcher
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



11
12
13
# File 'lib/msgr/connection.rb', line 11

def config
  @config
end

#uriObject (readonly)

Returns the value of attribute uri.



11
12
13
# File 'lib/msgr/connection.rb', line 11

def uri
  @uri
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



86
87
88
89
# File 'lib/msgr/connection.rb', line 86

def ack(delivery_tag)
  channel.ack delivery_tag
  log(:debug) { "Acked message: #{delivery_tag}" }
end

#bind(routes) ⇒ Object



78
79
80
81
82
83
84
# File 'lib/msgr/connection.rb', line 78

def bind(routes)
  if routes.empty?
    log(:warn) { "No routes to bound to. Bind will have no effect. (#{routes.inspect})" }
  else
    bind_all(routes)
  end
end

#channelObject



40
41
42
# File 'lib/msgr/connection.rb', line 40

def channel
  @channel ||= connection.create_channel
end

#closeObject



96
97
98
99
100
# File 'lib/msgr/connection.rb', line 96

def close
  channel.close    if @channel && @channel.open?
  connection.close if @connection
  log(:debug) { 'Closed.' }
end

#connectionObject



36
37
38
# File 'lib/msgr/connection.rb', line 36

def connection
  @connection ||= ::Bunny.new(config).tap { |b| b.start }
end

#exchangeObject



60
61
62
63
64
65
66
67
# File 'lib/msgr/connection.rb', line 60

def exchange
  @exchange ||= channel.topic(prefix(EXCHANGE_NAME), durable: true).tap do |ex|
    log(:debug) do
      "Created exchange #{ex.name} (type: #{ex.type}, " \
          "durable: #{ex.durable?}, auto_delete: #{ex.auto_delete?})"
    end
  end
end

#prefix(name) ⇒ Object



52
53
54
55
56
57
58
# File 'lib/msgr/connection.rb', line 52

def prefix(name)
  if config[:prefix].present?
    "#{config[:prefix]}-#{name}"
  else
    name
  end
end

#publish(payload, opts = {}) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/msgr/connection.rb', line 23

def publish(payload, opts = {})
  opts[:routing_key] = opts.delete(:to) if opts[:to]

  begin
    payload = MultiJson.dump(payload)
    exchange.publish payload, opts.merge(persistent: true, content_type: 'application/json')
  rescue => error
    exchange.publish payload.to_s, opts.merge(persistent: true, content_type: 'application/text')
  end

  log(:debug) { "Published message to #{opts[:routing_key]}" }
end

#queue(name) ⇒ Object



69
70
71
72
73
74
75
76
# File 'lib/msgr/connection.rb', line 69

def queue(name)
  channel.queue(prefix(name), durable: true).tap do |queue|
    log(:debug) do
      "Create queue #{queue.name} (durable: #{queue.durable?}, " \
      "auto_delete: #{queue.auto_delete?})"
    end
  end
end

#reject(delivery_tag, requeue = true) ⇒ Object



91
92
93
94
# File 'lib/msgr/connection.rb', line 91

def reject(delivery_tag, requeue = true)
  channel.reject delivery_tag, requeue
  log(:debug) { "Rejected message: #{delivery_tag}" }
end

#releaseObject



44
45
46
# File 'lib/msgr/connection.rb', line 44

def release
  subscriptions.each { |subscription| subscription.cancel }
end

#running?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/msgr/connection.rb', line 19

def running?
  subscriptions.any?
end

#subscriptionsObject



48
49
50
# File 'lib/msgr/connection.rb', line 48

def subscriptions
  @subscription ||= []
end