Class: MarchHare::Session
- Inherits:
-
Object
- Object
- MarchHare::Session
show all
- Defined in:
- lib/march_hare/session.rb
Overview
Connection to a RabbitMQ node.
Used to open and close connections and open (create) new channels.
Defined Under Namespace
Classes: BlockBlockedUnblockedListener, RecoveryListener, SSLContextException
Constant Summary
collapse
- DEFAULT_NETWORK_RECOVERY_INTERVAL =
Default reconnection interval for TCP connection failures
5.0
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(connection_factory, opts = {}) ⇒ Session
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
# File 'lib/march_hare/session.rb', line 150
def initialize(connection_factory, opts = {})
@cf = connection_factory
log_file = opts[:log_file] || STDOUT
log_level = opts[:log_level] || ENV["MARCH_HARE_LOG_LEVEL"] || Logger::WARN
@logger = opts.fetch(:logger, init_default_logger(log_file, log_level))
@cf.exception_handler = opts.fetch(:exception_handler, init_default_exception_handler(@logger))
@cf.automatic_recovery_enabled = false
@cf.topology_recovery_enabled = false
@uri = opts[:uri]
@uses_uri = !(@uri.nil?)
@executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)
@executor_shutdown_timeout = opts.fetch(:executor_shutdown_timeout, 30.0)
@addresses = self.class.addresses_from(opts)
@connection = build_new_connection
@channels = JavaConcurrent::ConcurrentHashMap.new
@automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
true
else
opts[:automatically_recover] || opts[:automatic_recovery]
end
@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
@shutdown_hooks = Array.new
@blocked_connection_hooks = Array.new
@connection_recovery_hooks = Array.new
@was_explicitly_closed = false
if @automatically_recover
self.add_automatic_recovery_hook
end
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(selector, *args) ⇒ Object
443
444
445
|
# File 'lib/march_hare/session.rb', line 443
def method_missing(selector, *args)
@connection.__send__(selector, *args)
end
|
Instance Attribute Details
143
144
145
|
# File 'lib/march_hare/session.rb', line 143
def channels
@channels
end
|
#logger ⇒ ::Logger
146
147
148
|
# File 'lib/march_hare/session.rb', line 146
def logger
@logger
end
|
Class Method Details
.connect(options = {}) ⇒ Object
Connects to a RabbitMQ node.
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# File 'lib/march_hare/session.rb', line 63
def self.connect(options = {})
cf = ConnectionFactory.new
if options[:uri]
cf.uri = options[:uri] if options[:uri]
elsif options[:hosts] || options[:addresses]
cf.virtual_host = vhost_from(options) if include_vhost?(options)
cf.username = username_from(options) if include_username?(options)
cf.password = password_from(options) if include_password?(options)
else
cf.host = hostname_from(options) if include_host?(options)
cf.port = options[:port].to_i if options[:port]
cf.virtual_host = vhost_from(options) if include_vhost?(options)
cf.username = username_from(options) if include_username?(options)
cf.password = password_from(options) if include_password?(options)
end
cf.connection_timeout = timeout_from(options) if include_timeout?(options)
cf.requested_heartbeat = heartbeat_from(options)
cf.connection_timeout = connection_timeout_from(options) if include_connection_timeout?(options)
cf.thread_factory = thread_factory_from(options) if include_thread_factory?(options)
cf.max_inbound_message_body_size = options[:max_inbound_message_body_size].to_i if options[:max_inbound_message_body_size]
tls = (options[:ssl] || options[:tls])
case tls
when true then
cf.use_ssl_protocol
when String then
options[:logger].info("Using TLS/SSL version #{tls}") if options[:logger]
if (cert_path = tls_certificate_path_from(options)) && (password = tls_certificate_password_from(options))
ctx = SSLContext.get_instance(tls)
pwd = password.to_java.to_char_array
begin
is = File.new(cert_path).to_inputstream
ks = KeyStore.get_instance('PKCS12')
ks.load(is, pwd)
kmf = KeyManagerFactory.get_instance("SunX509")
kmf.init(ks, pwd)
if options[:trust_manager]
ctx.init(kmf.get_key_managers, Array(options[:trust_manager]), nil)
else
tmf = TrustManagerFactory.get_instance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks)
ctx.init(kmf.get_key_managers, tmf.getTrustManagers(), nil)
end
cf.set_sasl_config(options[:sasl_config]) if options[:sasl_config]
cf.use_ssl_protocol(ctx)
rescue Java::JavaLang::Throwable => e
message = e.message
message << "\n"
message << e.backtrace.join("\n")
raise SSLContextException.new(message)
ensure
is.close if is
end
elsif options[:trust_manager]
cf.use_ssl_protocol(tls, options[:trust_manager])
else
cf.use_ssl_protocol(tls)
end
end
new(cf, options)
end
|
Instance Method Details
#add_automatic_recovery_hook ⇒ Object
310
311
312
313
314
315
316
317
318
|
# File 'lib/march_hare/session.rb', line 310
def add_automatic_recovery_hook
fn = Proc.new do |_, signal|
if should_initiate_connection_recovery?(signal)
self.automatically_recover
end
end
@automatic_recovery_hook = self.on_shutdown(&fn)
end
|
#add_blocked_listener(listener) ⇒ Object
283
284
285
|
# File 'lib/march_hare/session.rb', line 283
def add_blocked_listener(listener)
@connection.add_blocked_listener(listener)
end
|
#automatically_recover ⇒ Object
Begins automatic connection recovery (typically only used internally to recover from network failures)
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
|
# File 'lib/march_hare/session.rb', line 332
def automatically_recover
raise ConnectionClosedException if @was_explicitly_closed
@logger.debug("session: begin automatic connection recovery #{Thread.current.inspect}")
fire_recovery_start_hooks
ms = @network_recovery_interval * 1000
java.lang.Thread.sleep(ms)
new_connection = converting_rjc_exceptions_to_ruby do
reconnecting_on_network_failures(ms) { build_new_connection }
end
self.recover_shutdown_hooks(new_connection)
self.recover_connection_block_hooks(new_connection)
@channels.sort_by {|id, _| id}.each do |id, ch|
begin
ch.automatically_recover(self, new_connection)
rescue Exception, java.io.IOException => e
@logger.error(e)
end
end
@connection = new_connection
fire_recovery_hooks
@connection
end
|
#clear_blocked_connection_callbacks ⇒ Object
Clears all callbacks defined with #on_blocked and #on_unblocked.
288
289
290
291
292
|
# File 'lib/march_hare/session.rb', line 288
def clear_blocked_connection_callbacks
@blocked_connection_hooks.clear
@connection.clear_blocked_listeners
end
|
#clear_connection_recovery_callbacks ⇒ Object
Clears all callbacks defined with #on_recovery_started and #on_recovery
305
306
307
|
# File 'lib/march_hare/session.rb', line 305
def clear_connection_recovery_callbacks
@connection_recovery_hooks.clear
end
|
#close ⇒ Object
Closes connection gracefully.
This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.
227
228
229
230
231
232
233
234
235
236
237
|
# File 'lib/march_hare/session.rb', line 227
def close
@channels.select { |_, ch| ch.open? }.each do |_, ch|
ch.close
end
@was_explicitly_closed = true
maybe_shut_down_executor
@connection.close
rescue com.rabbitmq.client.AlreadyClosedException
@logger.debug("close: connection already closed")
end
|
#closed? ⇒ Boolean
251
252
253
|
# File 'lib/march_hare/session.rb', line 251
def closed?
!@connection.open?
end
|
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
# File 'lib/march_hare/session.rb', line 201
def create_channel(n = nil)
jc = if n
@connection.create_channel(n)
else
@connection.create_channel
end
if jc.nil?
error_message = " Unable to create a channel. This is likely due to having a channel_max setting\n on the rabbitmq broker (see https://www.rabbitmq.com/configure.html).\n There are currently \#{@channels.size} channels on this connection.\n MSG\n raise ::MarchHare::ChannelError.new(error_message, false)\n end\n\n ch = Channel.new(self, jc)\n register_channel(ch)\n\n ch\nend\n"
|
#disable_automatic_recovery ⇒ Object
326
327
328
|
# File 'lib/march_hare/session.rb', line 326
def disable_automatic_recovery
@connection.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end
|
#flush ⇒ Object
Flushes the socket used by this connection.
392
393
394
|
# File 'lib/march_hare/session.rb', line 392
def flush
@connection.flush
end
|
#heartbeat=(n) ⇒ Object
397
398
399
|
# File 'lib/march_hare/session.rb', line 397
def heartbeat=(n)
@connection.heartbeat = n
end
|
#hostname ⇒ Object
Also known as:
host
423
424
425
|
# File 'lib/march_hare/session.rb', line 423
def hostname
@cf.host
end
|
#on_blocked(&block) ⇒ Object
Defines a connection.blocked handler
268
269
270
271
272
273
|
# File 'lib/march_hare/session.rb', line 268
def on_blocked(&block)
listener = BlockBlockedUnblockedListener.for_blocked(block)
@blocked_connection_hooks << listener
self.add_blocked_listener(listener)
end
|
#on_recovery(&block) ⇒ Object
299
300
301
302
|
# File 'lib/march_hare/session.rb', line 299
def on_recovery(&block)
listener = RecoveryListener.for_finish(block)
@connection_recovery_hooks << listener
end
|
#on_recovery_start(&block) ⇒ Object
294
295
296
297
|
# File 'lib/march_hare/session.rb', line 294
def on_recovery_start(&block)
listener = RecoveryListener.for_start(block)
@connection_recovery_hooks << listener
end
|
#on_shutdown(&block) ⇒ Object
Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.
258
259
260
261
262
263
264
265
|
# File 'lib/march_hare/session.rb', line 258
def on_shutdown(&block)
sh = ShutdownListener.new(self, &block)
@shutdown_hooks << sh
@connection.add_shutdown_listener(sh)
sh
end
|
#on_unblocked(&block) ⇒ Object
Defines a connection.unblocked handler
276
277
278
279
280
281
|
# File 'lib/march_hare/session.rb', line 276
def on_unblocked(&block)
listener = BlockBlockedUnblockedListener.for_unblocked(block)
@blocked_connection_hooks << listener
self.add_blocked_listener(listener)
end
|
#open? ⇒ Boolean
Also known as:
connected?
245
246
247
|
# File 'lib/march_hare/session.rb', line 245
def open?
@connection.open?
end
|
#port ⇒ Object
428
429
430
|
# File 'lib/march_hare/session.rb', line 428
def port
@cf.port
end
|
#recover_connection_block_hooks(connection) ⇒ Object
384
385
386
387
388
389
|
# File 'lib/march_hare/session.rb', line 384
def recover_connection_block_hooks(connection)
@logger.debug("session: recover_connection_block_hooks")
@blocked_connection_hooks.each do |listener|
connection.add_blocked_listener(listener)
end
end
|
#recover_shutdown_hooks(connection) ⇒ Object
376
377
378
379
380
381
|
# File 'lib/march_hare/session.rb', line 376
def recover_shutdown_hooks(connection)
@logger.debug("session: recover_shutdown_hooks")
@shutdown_hooks.each do |sh|
connection.add_shutdown_listener(sh)
end
end
|
#register_channel(ch) ⇒ Object
458
459
460
|
# File 'lib/march_hare/session.rb', line 458
def register_channel(ch)
@channels[ch.channel_number] = ch
end
|
#reopen ⇒ Object
239
240
241
242
|
# File 'lib/march_hare/session.rb', line 239
def reopen
@was_explicitly_closed = false
automatically_recover
end
|
#should_initiate_connection_recovery?(signal) ⇒ Boolean
321
322
323
|
# File 'lib/march_hare/session.rb', line 321
def should_initiate_connection_recovery?(signal)
!signal.initiated_by_application || signal.instance_of?(MissedHeartbeatException)
end
|
#start ⇒ Object
No-op, exists for better API compatibility with Bunny.
402
403
404
405
406
407
408
409
410
411
412
|
# File 'lib/march_hare/session.rb', line 402
def start
self
end
|
#tls? ⇒ Boolean
Also known as:
ssl?
432
433
434
435
436
437
438
439
440
|
# File 'lib/march_hare/session.rb', line 432
def tls?
if @uses_uri
u = java.net.URI.new(@uri.to_java_string)
u.scheme == "amqps"
else
self.port == ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
end
end
|
#to_s ⇒ String
448
449
450
|
# File 'lib/march_hare/session.rb', line 448
def to_s
"#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end
|
#unregister_channel(ch) ⇒ Object
463
464
465
|
# File 'lib/march_hare/session.rb', line 463
def unregister_channel(ch)
@channels.delete(ch.channel_number)
end
|
#username ⇒ Object
Also known as:
user
414
415
416
|
# File 'lib/march_hare/session.rb', line 414
def username
@cf.username
end
|
#vhost ⇒ Object
419
420
421
|
# File 'lib/march_hare/session.rb', line 419
def vhost
@cf.virtual_host
end
|