Class: Msgr::Connection

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/msgr/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log, #log_name

Constructor Details

#initialize(uri, config, dispatcher) ⇒ Connection

Returns a new instance of Connection.



11
12
13
14
15
16
# File 'lib/msgr/connection.rb', line 11

def initialize(uri, config, dispatcher)
  @uri        = uri
  @config     = config
  @dispatcher = dispatcher
  @channels   = []
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



9
10
11
# File 'lib/msgr/connection.rb', line 9

def config
  @config
end

#uriObject (readonly)

Returns the value of attribute uri.



9
10
11
# File 'lib/msgr/connection.rb', line 9

def uri
  @uri
end

Instance Method Details

#bind(routes) ⇒ Object



87
88
89
90
91
92
93
94
95
96
# File 'lib/msgr/connection.rb', line 87

def bind(routes)
  if routes.empty?
    log(:warn) do
      "No routes to bound to. Bind will have no effect:\n  " \
        "#{routes.inspect}"
    end
  else
    bind_all(routes)
  end
end

#bindingsObject



83
84
85
# File 'lib/msgr/connection.rb', line 83

def bindings
  @bindings ||= []
end

#channel(prefetch: 1) ⇒ Object



38
39
40
41
42
43
# File 'lib/msgr/connection.rb', line 38

def channel(prefetch: 1)
  channel = Msgr::Channel.new(config, connection)
  channel.prefetch(prefetch)
  @channels << channel
  channel
end

#closeObject



98
99
100
101
102
# File 'lib/msgr/connection.rb', line 98

def close
  @channels.each(&:close)
  connection.close if @connection
  log(:debug) { 'Closed.' }
end

#connectObject



34
35
36
# File 'lib/msgr/connection.rb', line 34

def connect
  connection
end

#connectionObject



30
31
32
# File 'lib/msgr/connection.rb', line 30

def connection
  @connection ||= ::Bunny.new(config).tap(&:start)
end

#deleteObject



57
58
59
60
61
62
63
# File 'lib/msgr/connection.rb', line 57

def delete
  return if bindings.empty?

  log(:debug) { "Delete bindings (#{bindings.size})..." }

  bindings.each(&:delete)
end

#exchangeObject



45
46
47
# File 'lib/msgr/connection.rb', line 45

def exchange
  @exchange ||= channel.exchange
end

#publish(message, opts = {}) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/msgr/connection.rb', line 22

def publish(message, opts = {})
  opts[:routing_key] = opts.delete(:to) if opts[:to]

  exchange.publish message.to_s, opts.merge(persistent: true)

  log(:debug) { "Published message to #{opts[:routing_key]}" }
end

#purge(**kwargs) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/msgr/connection.rb', line 65

def purge(**kwargs)
  return if bindings.empty?

  log(:debug) { "Purge bindings (#{bindings.size})..." }

  bindings.each {|b| b.purge(**kwargs) }
end

#purge_queue(name) ⇒ Object



73
74
75
76
77
78
79
80
81
# File 'lib/msgr/connection.rb', line 73

def purge_queue(name)
  # Creating the queue in passive mode ensures that queues that do not exist
  # won't be created just to purge them.
  # That requires creating a new channel every time, as exceptions (on
  # missing queues) invalidate the channel.
  channel.queue(name, passive: true).purge
rescue Bunny::NotFound
  nil
end

#releaseObject



49
50
51
52
53
54
55
# File 'lib/msgr/connection.rb', line 49

def release
  return if bindings.empty?

  log(:debug) { "Release bindings (#{bindings.size})..." }

  bindings.each(&:release)
end

#running?Boolean

Returns:

  • (Boolean)


18
19
20
# File 'lib/msgr/connection.rb', line 18

def running?
  bindings.any?
end