Class: Tochtli::RabbitConnection
- Inherits:
-
Object
- Object
- Tochtli::RabbitConnection
show all
- Defined in:
- lib/tochtli/rabbit_connection.rb
Defined Under Namespace
Classes: ChannelWrap, Config, ConnectionFailed
Constant Summary
collapse
- DEFAULT_CONNECTION_NAME =
'default'
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(config = nil, channel_pool = nil) ⇒ RabbitConnection
Returns a new instance of RabbitConnection.
16
17
18
19
20
21
22
|
# File 'lib/tochtli/rabbit_connection.rb', line 16
def initialize(config = nil, channel_pool=nil)
@config = config.is_a?(RabbitConnection::Config) ? config : RabbitConnection::Config.load(nil, config)
@exchange_name = @config.delete(:exchange_name)
@work_pool_size = @config.delete(:work_pool_size)
@logger = @config.delete(:logger) || Tochtli.logger
@channel_pool = channel_pool ? channel_pool : Hash.new
end
|
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
6
7
8
|
# File 'lib/tochtli/rabbit_connection.rb', line 6
def connection
@connection
end
|
#exchange_name ⇒ Object
Returns the value of attribute exchange_name.
7
8
9
|
# File 'lib/tochtli/rabbit_connection.rb', line 7
def exchange_name
@exchange_name
end
|
#logger ⇒ Object
Returns the value of attribute logger.
7
8
9
|
# File 'lib/tochtli/rabbit_connection.rb', line 7
def logger
@logger
end
|
Class Method Details
.close(name = nil) ⇒ Object
43
44
45
46
47
48
|
# File 'lib/tochtli/rabbit_connection.rb', line 43
def self.close(name=nil)
name ||= defined?(Rails) ? Rails.env : nil
raise ArgumentError, "RabbitMQ configuration name not specified" unless name
connection = self.connections.delete(name.to_sym)
connection.disconnect if connection && connection.open?
end
|
.open(name = nil, config = nil) ⇒ Object
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/tochtli/rabbit_connection.rb', line 24
def self.open(name=nil, config=nil)
name ||= defined?(Rails) ? Rails.env : DEFAULT_CONNECTION_NAME
raise ArgumentError, "RabbitMQ configuration name not specified" if !name && !ENV.has_key?('RABBITMQ_URL')
connection = self.connections[name.to_sym]
if !connection || !connection.open?
config = config.is_a?(RabbitConnection::Config) ? config : RabbitConnection::Config.load(name, config)
connection = new(config)
connection.connect
self.connections[name.to_sym] = connection
end
if block_given?
yield connection
close name
else
connection
end
end
|
Instance Method Details
#ack(delivery_tag) ⇒ Object
117
118
119
|
# File 'lib/tochtli/rabbit_connection.rb', line 117
def ack(delivery_tag)
channel.ack(delivery_tag, false)
end
|
#channel(thread = Thread.current) ⇒ Object
101
102
103
|
# File 'lib/tochtli/rabbit_connection.rb', line 101
def channel(thread=Thread.current)
channel_wrap(thread).channel
end
|
#connect(opts = {}) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
# File 'lib/tochtli/rabbit_connection.rb', line 50
def connect(opts={})
return if open?
defaults = {}
unless opts[:logger]
defaults[:logger] = @logger.dup
defaults[:logger].level = Tochtli.debug_bunny ? Logger::DEBUG : Logger::WARN
end
setup_bunny_connection(defaults.merge(opts))
if block_given?
yield
disconnect if open?
end
end
|
#create_channel(consumer_pool_size = 1) ⇒ Object
141
142
143
144
145
|
# File 'lib/tochtli/rabbit_connection.rb', line 141
def create_channel(consumer_pool_size = 1)
@connection.create_channel(nil, consumer_pool_size).tap do |channel|
channel.confirm_select end
end
|
#create_exchange(channel) ⇒ Object
147
148
149
|
# File 'lib/tochtli/rabbit_connection.rb', line 147
def create_exchange(channel)
channel.topic(@exchange_name, durable: true)
end
|
#create_reply_queue ⇒ Object
89
90
91
|
# File 'lib/tochtli/rabbit_connection.rb', line 89
def create_reply_queue
Tochtli::ReplyQueue.new(self, @logger)
end
|
#disconnect ⇒ Object
67
68
69
70
71
72
73
74
75
|
# File 'lib/tochtli/rabbit_connection.rb', line 67
def disconnect
@connection.close if @connection
rescue Bunny::ClientTimeout
false
ensure
@channel_pool.clear
@connection = nil
@reply_queue = nil
end
|
#exchange(thread = Thread.current) ⇒ Object
97
98
99
|
# File 'lib/tochtli/rabbit_connection.rb', line 97
def exchange(thread=Thread.current)
channel_wrap(thread).exchange
end
|
#open? ⇒ Boolean
77
78
79
|
# File 'lib/tochtli/rabbit_connection.rb', line 77
def open?
@connection && @connection.open?
end
|
#publish(routing_key, message, options = {}) ⇒ Object
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/tochtli/rabbit_connection.rb', line 121
def publish(routing_key, message, options={})
begin
payload = message.to_json
rescue Exception
logger.error "Unable to serialize message: #{message.inspect}"
logger.error $!
raise "Unable to serialize message to JSON: #{$!}"
end
exchange.publish(payload, {
routing_key: routing_key,
persistent: true,
mandatory: true,
timestamp: Time.now.to_i,
message_id: message.id,
type: message.class.name.underscore,
content_type: "application/json"
}.merge(options))
end
|
#queue(name, routing_keys = [], options = {}) ⇒ Object
105
106
107
108
109
110
111
|
# File 'lib/tochtli/rabbit_connection.rb', line 105
def queue(name, routing_keys=[], options={})
queue = channel.queue(name, {durable: true}.merge(options))
routing_keys.each do |routing_key|
queue.bind(exchange, routing_key: routing_key)
end
queue
end
|
#queue_exists?(name) ⇒ Boolean
113
114
115
|
# File 'lib/tochtli/rabbit_connection.rb', line 113
def queue_exists?(name)
@connection.queue_exists?(name)
end
|
#reply_queue ⇒ Object
93
94
95
|
# File 'lib/tochtli/rabbit_connection.rb', line 93
def reply_queue
@reply_queue ||= create_reply_queue
end
|
#setup_bunny_connection(opts = {}) ⇒ Object
81
82
83
84
85
86
87
|
# File 'lib/tochtli/rabbit_connection.rb', line 81
def setup_bunny_connection(opts={})
@connection = Bunny.new(@config, opts)
@connection.start
rescue Bunny::TCPConnectionFailed => ex
connection_url = "amqp://#{@connection.user}@#{@connection.host}:#{@connection.port}/#{@connection.vhost}"
raise ConnectionFailed.new("Unable to connect to: '#{connection_url}' (#{ex.message})")
end
|