Class: Tochtli::RabbitConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/tochtli/rabbit_connection.rb

Defined Under Namespace

Classes: ChannelWrap, Config, ConnectionFailed

Constant Summary collapse

DEFAULT_CONNECTION_NAME =
'default'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = nil, channel_pool = nil) ⇒ RabbitConnection

Returns a new instance of RabbitConnection.



16
17
18
19
20
21
22
# File 'lib/tochtli/rabbit_connection.rb', line 16

def initialize(config = nil, channel_pool=nil)
  @config         = config.is_a?(RabbitConnection::Config) ? config : RabbitConnection::Config.load(nil, config)
  @exchange_name  = @config.delete(:exchange_name)
  @work_pool_size = @config.delete(:work_pool_size)
  @logger         = @config.delete(:logger) || Tochtli.logger
  @channel_pool   = channel_pool ? channel_pool : Hash.new
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



6
7
8
# File 'lib/tochtli/rabbit_connection.rb', line 6

def connection
  @connection
end

#exchange_nameObject (readonly)

Returns the value of attribute exchange_name.



7
8
9
# File 'lib/tochtli/rabbit_connection.rb', line 7

def exchange_name
  @exchange_name
end

#loggerObject (readonly)

Returns the value of attribute logger.



7
8
9
# File 'lib/tochtli/rabbit_connection.rb', line 7

def logger
  @logger
end

Class Method Details

.close(name = nil) ⇒ Object

Raises:

  • (ArgumentError)


43
44
45
46
47
48
# File 'lib/tochtli/rabbit_connection.rb', line 43

def self.close(name=nil)
  name ||= defined?(Rails) ? Rails.env : nil
  raise ArgumentError, "RabbitMQ configuration name not specified" unless name
  connection = self.connections.delete(name.to_sym)
  connection.disconnect if connection && connection.open?
end

.open(name = nil, config = nil) ⇒ Object

Raises:

  • (ArgumentError)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/tochtli/rabbit_connection.rb', line 24

def self.open(name=nil, config=nil)
  name ||= defined?(Rails) ? Rails.env : DEFAULT_CONNECTION_NAME
  raise ArgumentError, "RabbitMQ configuration name not specified" if !name && !ENV.has_key?('RABBITMQ_URL')
  connection = self.connections[name.to_sym]
  if !connection || !connection.open?
    config     = config.is_a?(RabbitConnection::Config) ? config : RabbitConnection::Config.load(name, config)
    connection = new(config)
    connection.connect
    self.connections[name.to_sym] = connection
  end

  if block_given?
    yield connection
    close name
  else
    connection
  end
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



117
118
119
# File 'lib/tochtli/rabbit_connection.rb', line 117

def ack(delivery_tag)
  channel.ack(delivery_tag, false)
end

#channel(thread = Thread.current) ⇒ Object



101
102
103
# File 'lib/tochtli/rabbit_connection.rb', line 101

def channel(thread=Thread.current)
  channel_wrap(thread).channel
end

#connect(opts = {}) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/tochtli/rabbit_connection.rb', line 50

def connect(opts={})
  return if open?

  defaults = {}
  unless opts[:logger]
    defaults[:logger]       = @logger.dup
    defaults[:logger].level = Tochtli.debug_bunny ? Logger::DEBUG : Logger::WARN
  end

  setup_bunny_connection(defaults.merge(opts))

  if block_given?
    yield
    disconnect if open?
  end
end

#create_channel(consumer_pool_size = 1) ⇒ Object



141
142
143
144
145
# File 'lib/tochtli/rabbit_connection.rb', line 141

def create_channel(consumer_pool_size = 1)
  @connection.create_channel(nil, consumer_pool_size).tap do |channel|
    channel.confirm_select # use publisher confirmations
  end
end

#create_exchange(channel) ⇒ Object



147
148
149
# File 'lib/tochtli/rabbit_connection.rb', line 147

def create_exchange(channel)
  channel.topic(@exchange_name, durable: true)
end

#create_reply_queueObject



89
90
91
# File 'lib/tochtli/rabbit_connection.rb', line 89

def create_reply_queue
  Tochtli::ReplyQueue.new(self, @logger)
end

#disconnectObject



67
68
69
70
71
72
73
74
75
# File 'lib/tochtli/rabbit_connection.rb', line 67

def disconnect
  @connection.close if @connection
rescue Bunny::ClientTimeout
  false
ensure
  @channel_pool.clear
  @connection  = nil
  @reply_queue = nil
end

#exchange(thread = Thread.current) ⇒ Object



97
98
99
# File 'lib/tochtli/rabbit_connection.rb', line 97

def exchange(thread=Thread.current)
  channel_wrap(thread).exchange
end

#open?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/tochtli/rabbit_connection.rb', line 77

def open?
  @connection && @connection.open?
end

#publish(routing_key, message, options = {}) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/tochtli/rabbit_connection.rb', line 121

def publish(routing_key, message, options={})
  begin
    payload = message.to_json
  rescue Exception
    logger.error "Unable to serialize message: #{message.inspect}"
    logger.error $!
    raise "Unable to serialize message to JSON: #{$!}"
  end

  exchange.publish(payload, {
      routing_key:  routing_key,
      persistent:   true,
      mandatory:    true,
      timestamp:    Time.now.to_i,
      message_id:   message.id,
      type:         message.class.name.underscore,
      content_type: "application/json"
  }.merge(options))
end

#queue(name, routing_keys = [], options = {}) ⇒ Object



105
106
107
108
109
110
111
# File 'lib/tochtli/rabbit_connection.rb', line 105

def queue(name, routing_keys=[], options={})
  queue = channel.queue(name, {durable: true}.merge(options))
  routing_keys.each do |routing_key|
    queue.bind(exchange, routing_key: routing_key)
  end
  queue
end

#queue_exists?(name) ⇒ Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/tochtli/rabbit_connection.rb', line 113

def queue_exists?(name)
  @connection.queue_exists?(name)
end

#reply_queueObject



93
94
95
# File 'lib/tochtli/rabbit_connection.rb', line 93

def reply_queue
  @reply_queue ||= create_reply_queue
end

#setup_bunny_connection(opts = {}) ⇒ Object



81
82
83
84
85
86
87
# File 'lib/tochtli/rabbit_connection.rb', line 81

def setup_bunny_connection(opts={})
  @connection = Bunny.new(@config, opts)
  @connection.start
rescue Bunny::TCPConnectionFailed => ex
  connection_url = "amqp://#{@connection.user}@#{@connection.host}:#{@connection.port}/#{@connection.vhost}"
  raise ConnectionFailed.new("Unable to connect to: '#{connection_url}' (#{ex.message})")
end