Class: Philotic::Connection
- Inherits:
-
Object
- Object
- Philotic::Connection
show all
- Extended by:
- Forwardable
- Defined in:
- lib/philotic/connection.rb
Defined Under Namespace
Classes: TCPConnectionFailed
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of Connection.
29
30
31
|
# File 'lib/philotic/connection.rb', line 29
def initialize
@connection_attempts = 0
end
|
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
24
25
26
|
# File 'lib/philotic/connection.rb', line 24
def connection
@connection
end
|
#connection_attempts ⇒ Object
Returns the value of attribute connection_attempts.
24
25
26
|
# File 'lib/philotic/connection.rb', line 24
def connection_attempts
@connection_attempts
end
|
#logger ⇒ Object
Returns the value of attribute logger.
25
26
27
|
# File 'lib/philotic/connection.rb', line 25
def logger
@logger
end
|
#publisher ⇒ Object
33
34
35
|
# File 'lib/philotic/connection.rb', line 33
def publisher
@publisher ||= Philotic::Publisher.new self
end
|
#subscriber ⇒ Object
37
38
39
|
# File 'lib/philotic/connection.rb', line 37
def subscriber
@subscriber ||= Philotic::Subscriber.new self
end
|
Instance Method Details
#attempt_connection ⇒ Object
74
75
76
77
78
79
80
81
|
# File 'lib/philotic/connection.rb', line 74
def attempt_connection
@connection_attempts += 1
logger.warn { "Connecting to RabbitMQ: #{config.sanitized_rabbit_url}. Attempt #{connection_attempts} of #{config.connection_attempts}" } if connection_attempts > 1
@connection = Bunny.new(config.rabbit_url, connection_settings)
@connection.start
@connection_attempts = 0
end
|
#bind_queue(queue, config) ⇒ Object
139
140
141
142
143
144
145
146
147
148
|
# File 'lib/philotic/connection.rb', line 139
def bind_queue(queue, config)
queue_exchange = exchange_from_config(config)
bindings = config[:bindings]
bindings.each do |arguments|
queue.bind(queue_exchange, {arguments: arguments})
logger.info { "Added binding to queue. queue: #{queue.name} binding: #{arguments}" }
end
logger.info { "Finished adding bindings to queue. queue: #{queue.name}" }
end
|
#channel ⇒ Object
102
103
104
|
# File 'lib/philotic/connection.rb', line 102
def channel
@channel ||= connection.create_channel
end
|
#close ⇒ Object
91
92
93
94
95
96
|
# File 'lib/philotic/connection.rb', line 91
def close
logger.warn { "closing connection to RabbitMQ: #{config.sanitized_rabbit_url}" }
connection.close if connected?
@channel = nil
@exchange = nil
end
|
#config ⇒ Object
41
42
43
|
# File 'lib/philotic/connection.rb', line 41
def config
@config ||= Philotic::Config.new self
end
|
#connect! ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/philotic/connection.rb', line 45
def connect!
return if connected?
start_connection!
if connected?
logger.info { "Connected to RabbitMQ: #{config.sanitized_rabbit_url}" }
set_exchange_return_handler!
true
else
logger.error { "Failed to connect to RabbitMQ: #{config.sanitized_rabbit_url}" }
false
end
end
|
#connected? ⇒ Boolean
98
99
100
|
# File 'lib/philotic/connection.rb', line 98
def connected?
connection && connection.connected?
end
|
#connection_settings ⇒ Object
83
84
85
86
87
88
89
|
# File 'lib/philotic/connection.rb', line 83
def connection_settings
{
automatically_recover: config.automatically_recover,
network_recovery_interval: config.network_recovery_interval,
continuation_timeout: config.continuation_timeout,
}
end
|
#exchange ⇒ Object
106
107
108
|
# File 'lib/philotic/connection.rb', line 106
def exchange
@exchange ||= channel.send(config.exchange_type, config.exchange_name, durable: true)
end
|
#exchange_from_config(config) ⇒ Object
150
151
152
|
# File 'lib/philotic/connection.rb', line 150
def exchange_from_config(config)
config[:exchange] ? channel.send(self.config.exchange_type, config[:exchange], durable: true) : exchange
end
|
#initialize_named_queue!(queue_name, config) ⇒ Object
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/philotic/connection.rb', line 116
def initialize_named_queue!(queue_name, config)
raise RuntimeError.new 'Philotic.config.initialize_named_queues must be true to run Philotic.initialize_named_queue!' unless self.config.initialize_named_queues
connect!
queue_exists = connection.queue_exists? queue_name
should_delete_queue = queue_exists && self.config.delete_existing_queues
should_create_queue = !queue_exists || self.config.delete_existing_queues
if should_delete_queue
channel.queue(queue_name, passive: true).delete
logger.info { "deleted old queue. queue: #{queue_name}" }
end
if should_create_queue
config = config.deep_symbolize_keys
queue = queue_from_config(queue_name, config)
bind_queue(queue, config)
else
logger.warn { "Queue #{queue_name} not created; it already exists. self.config.delete_existing_queues must be true to override." }
end
end
|
#queue_from_config(queue_name, config) ⇒ Object
154
155
156
157
158
159
160
161
|
# File 'lib/philotic/connection.rb', line 154
def queue_from_config(queue_name, config)
queue_options = Philotic::DEFAULT_NAMED_QUEUE_OPTIONS.dup
queue_options.merge!(config[:options] || {})
channel.queue(queue_name, queue_options).tap do
logger.info { "Created queue. queue:#{queue_name}" }
end
end
|
#set_exchange_return_handler! ⇒ Object
110
111
112
113
114
|
# File 'lib/philotic/connection.rb', line 110
def set_exchange_return_handler!
exchange.on_return do |basic_return, metadata, payload|
config.message_return_handler.call(basic_return, metadata, payload)
end
end
|
#start_connection! ⇒ Object
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/philotic/connection.rb', line 60
def start_connection!
begin
attempt_connection
rescue ::Bunny::TCPConnectionFailed => e
if connection_attempts < config.connection_attempts
retry
else
attempts = connection_attempts
@connection_attempts = 0
raise TCPConnectionFailed.new "Failed to connect to RabbitMQ server after #{attempts} attempts", config.sanitized_rabbit_url
end
end
end
|