Class: Msgr::Connection

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/msgr/connection.rb

Constant Summary collapse

EXCHANGE_NAME =
'msgr'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log, #log_name

Constructor Details

#initialize(uri, config, dispatcher) ⇒ Connection

Returns a new instance of Connection.



13
14
15
16
17
# File 'lib/msgr/connection.rb', line 13

def initialize(uri, config, dispatcher)
  @uri        = uri
  @config     = config
  @dispatcher = dispatcher
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



11
12
13
# File 'lib/msgr/connection.rb', line 11

def config
  @config
end

#uriObject (readonly)

Returns the value of attribute uri.



11
12
13
# File 'lib/msgr/connection.rb', line 11

def uri
  @uri
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



90
91
92
93
# File 'lib/msgr/connection.rb', line 90

def ack(delivery_tag)
  channel.ack delivery_tag
  log(:debug) { "Acked message: #{delivery_tag}" }
end

#bind(routes) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/msgr/connection.rb', line 82

def bind(routes)
  if routes.empty?
    log(:warn) { "No routes to bound to. Bind will have no effect. (#{routes.inspect})" }
  else
    bind_all(routes)
  end
end

#channelObject



44
45
46
# File 'lib/msgr/connection.rb', line 44

def channel
  @channel ||= connection.create_channel
end

#closeObject



100
101
102
103
104
# File 'lib/msgr/connection.rb', line 100

def close
  channel.close    if @channel && @channel.open?
  connection.close if @connection
  log(:debug) { 'Closed.' }
end

#connectObject



40
41
42
# File 'lib/msgr/connection.rb', line 40

def connect
  connection
end

#connectionObject



36
37
38
# File 'lib/msgr/connection.rb', line 36

def connection
  @connection ||= ::Bunny.new(config).tap { |b| b.start }
end

#exchangeObject



64
65
66
67
68
69
70
71
# File 'lib/msgr/connection.rb', line 64

def exchange
  @exchange ||= channel.topic(prefix(EXCHANGE_NAME), durable: true).tap do |ex|
    log(:debug) do
      "Created exchange #{ex.name} (type: #{ex.type}, " \
          "durable: #{ex.durable?}, auto_delete: #{ex.auto_delete?})"
    end
  end
end

#prefix(name) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/msgr/connection.rb', line 56

def prefix(name)
  if config[:prefix].present?
    "#{config[:prefix]}-#{name}"
  else
    name
  end
end

#publish(payload, opts = {}) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/msgr/connection.rb', line 23

def publish(payload, opts = {})
  opts[:routing_key] = opts.delete(:to) if opts[:to]

  begin
    payload = MultiJson.dump(payload)
    exchange.publish payload, opts.merge(persistent: true, content_type: 'application/json')
  rescue => error
    exchange.publish payload.to_s, opts.merge(persistent: true, content_type: 'application/text')
  end

  log(:debug) { "Published message to #{opts[:routing_key]}" }
end

#queue(name) ⇒ Object



73
74
75
76
77
78
79
80
# File 'lib/msgr/connection.rb', line 73

def queue(name)
  channel.queue(prefix(name), durable: true).tap do |queue|
    log(:debug) do
      "Create queue #{queue.name} (durable: #{queue.durable?}, " \
      "auto_delete: #{queue.auto_delete?})"
    end
  end
end

#reject(delivery_tag, requeue = true) ⇒ Object



95
96
97
98
# File 'lib/msgr/connection.rb', line 95

def reject(delivery_tag, requeue = true)
  channel.reject delivery_tag, requeue
  log(:debug) { "Rejected message: #{delivery_tag}" }
end

#releaseObject



48
49
50
# File 'lib/msgr/connection.rb', line 48

def release
  subscriptions.each { |subscription| subscription.cancel }
end

#running?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/msgr/connection.rb', line 19

def running?
  subscriptions.any?
end

#subscriptionsObject



52
53
54
# File 'lib/msgr/connection.rb', line 52

def subscriptions
  @subscription ||= []
end