Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: BlockBlockedUnblockedListener, RecoveryListener, SSLContextException

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_factory, opts = {}) ⇒ Session

Returns a new instance of Session.



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/march_hare/session.rb', line 147

def initialize(connection_factory, opts = {})
  @cf               = connection_factory

  log_file                   = opts[:log_file] || STDOUT
  log_level                  = opts[:log_level] || ENV["MARCH_HARE_LOG_LEVEL"] || Logger::WARN
  @logger                    = opts.fetch(:logger, init_default_logger(log_file, log_level))
  @cf.exception_handler      = opts.fetch(:exception_handler, init_default_exception_handler(@logger))

  # March Hare uses its own connection recovery implementation and
  # as of Java client 4.x automatic recovery is enabled by
  # default. MK.
  @cf.automatic_recovery_enabled = false
  @cf.topology_recovery_enabled  = false

  @uri              = opts[:uri]
  @uses_uri         = !(@uri.nil?)
  # executors cannot be restarted after shutdown,
  # so we really need a factory here. MK.
  @executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)
  # we expect this option to be specified in seconds
  @executor_shutdown_timeout = opts.fetch(:executor_shutdown_timeout, 30.0)

  @addresses        = self.class.addresses_from(opts)
  @connection       = build_new_connection
  @channels         = JavaConcurrent::ConcurrentHashMap.new

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @shutdown_hooks            = Array.new
  @blocked_connection_hooks  = Array.new
  @connection_recovery_hooks = Array.new
  @was_explicitly_closed     = false

  if @automatically_recover
    self.add_automatic_recovery_hook
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



440
441
442
# File 'lib/march_hare/session.rb', line 440

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection.

Returns:



140
141
142
# File 'lib/march_hare/session.rb', line 140

def channels
  @channels
end

#logger::Logger (readonly)

Returns Logger instance.

Returns:

  • (::Logger)

    Logger instance



143
144
145
# File 'lib/march_hare/session.rb', line 143

def logger
  @logger
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :executor_shutdown_timeout (Numeric) — default: 30.0

    when recovering from a network failure how long should we wait for the current threadpool to finish handling its messages

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :hosts (Array<String>) — default: ["127.0.0.1"]

    Array of hostnames or ips to connect to. The connection returned is the first in the array that succeeds.

  • :addresses (Array<String>) — default: ["127.0.0.1:5672", "localhost:5673"]

    Array of addresses to connect to. The connection returned is the first in the array that succeeds.

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :requested_heartbeat (Integer) — default: 580

    Heartbeat timeout used. 0 means no heartbeat.

  • :tls (Boolean, String) — default: false

    Set to true to use TLS/SSL connection or TLS version name as a string, e.g. “TLSv1.1”. This will switch port to 5671 by default.

  • :tls_certificate_path (String)

    Path to a PKCS12 certificate.

  • :thread_factory (java.util.concurrent.ThreadFactory)

    Thread factory RabbitMQ Java client will use (useful in restricted PaaS platforms such as GAE)

  • :logger (Logger)

    The logger. If missing, one is created using :log_file and :log_level.

  • :log_file (IO, String)

    The file or path to use when creating a logger. Defaults to STDOUT.

  • :log_level (Integer)

    The log level to use when creating a logger. Defaults to LOGGER::WARN

See Also:



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/march_hare/session.rb', line 62

def self.connect(options = {})
  cf = ConnectionFactory.new

  if options[:uri]
    cf.uri          = options[:uri]          if options[:uri]
  elsif options[:hosts] || options[:addresses]
    cf.virtual_host = vhost_from(options)    if include_vhost?(options)
    cf.username     = username_from(options) if include_username?(options)
    cf.password     = password_from(options) if include_password?(options)
  else
    cf.host         = hostname_from(options) if include_host?(options)
    cf.port         = options[:port].to_i    if options[:port]
    cf.virtual_host = vhost_from(options)    if include_vhost?(options)
    cf.username     = username_from(options) if include_username?(options)
    cf.password     = password_from(options) if include_password?(options)
  end

  cf.connection_timeout  = timeout_from(options)  if include_timeout?(options)

  cf.requested_heartbeat = heartbeat_from(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  cf.thread_factory      = thread_factory_from(options)    if include_thread_factory?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    options[:logger].info("Using TLS/SSL version #{tls}") if options[:logger]
    # Note: `options[:trust_manager] = com.rabbitmq.client.TrustEverythingTrustManager.new`
    # can be set to effectively disable TLS verification.
    if (cert_path = tls_certificate_path_from(options)) && (password = tls_certificate_password_from(options))
      ctx = SSLContext.get_instance(tls)
      pwd = password.to_java.to_char_array
      begin
        is = File.new(cert_path).to_inputstream
        ks = KeyStore.get_instance('PKCS12')
        ks.load(is, pwd)

        kmf = KeyManagerFactory.get_instance("SunX509")
        kmf.init(ks, pwd)

        if options[:trust_manager]
          ctx.init(kmf.get_key_managers, Array(options[:trust_manager]), nil)
        else
          # use the key store as the trust store
          tmf = TrustManagerFactory.get_instance(TrustManagerFactory.getDefaultAlgorithm());
          tmf.init(ks)
          ctx.init(kmf.get_key_managers, tmf.getTrustManagers(), nil)
        end

        cf.set_sasl_config(options[:sasl_config]) if options[:sasl_config]
        cf.use_ssl_protocol(ctx)
      rescue Java::JavaLang::Throwable => e
        message = e.message
        message << "\n"
        message << e.backtrace.join("\n")

        raise SSLContextException.new(message)
      ensure
        is.close if is
      end
    elsif options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    else
      cf.use_ssl_protocol(tls)
    end
  end

  new(cf, options)
end

Instance Method Details

#add_automatic_recovery_hookObject



307
308
309
310
311
312
313
314
315
# File 'lib/march_hare/session.rb', line 307

def add_automatic_recovery_hook
  fn = Proc.new do |_, signal|
    if should_initiate_connection_recovery?(signal)
      self.automatically_recover
    end
  end

  @automatic_recovery_hook = self.on_shutdown(&fn)
end

#add_blocked_listener(listener) ⇒ Object



280
281
282
# File 'lib/march_hare/session.rb', line 280

def add_blocked_listener(listener)
  @connection.add_blocked_listener(listener)
end

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)



329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/march_hare/session.rb', line 329

def automatically_recover
  raise ConnectionClosedException if @was_explicitly_closed
  @logger.debug("session: begin automatic connection recovery #{Thread.current.inspect}")

  fire_recovery_start_hooks

  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  new_connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) { build_new_connection }
  end
  self.recover_shutdown_hooks(new_connection)
  self.recover_connection_block_hooks(new_connection)

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, new_connection)
    rescue Exception, java.io.IOException => e
      @logger.error(e)
    end
  end

  @connection = new_connection

  fire_recovery_hooks

  @connection
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.



285
286
287
288
289
# File 'lib/march_hare/session.rb', line 285

def clear_blocked_connection_callbacks
  @blocked_connection_hooks.clear

  @connection.clear_blocked_listeners
end

#clear_connection_recovery_callbacksObject

Clears all callbacks defined with #on_recovery_started and #on_recovery



302
303
304
# File 'lib/march_hare/session.rb', line 302

def clear_connection_recovery_callbacks
  @connection_recovery_hooks.clear
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



224
225
226
227
228
229
230
231
232
233
234
# File 'lib/march_hare/session.rb', line 224

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  @was_explicitly_closed = true
  maybe_shut_down_executor
  @connection.close
rescue com.rabbitmq.client.AlreadyClosedException
  @logger.debug("close: connection already closed")
end

#closed?Boolean

Returns true if this channel is closed.

Returns:

  • (Boolean)

    true if this channel is closed



248
249
250
# File 'lib/march_hare/session.rb', line 248

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/march_hare/session.rb', line 198

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end
  if jc.nil?
    error_message = <<-MSG
      Unable to create a channel. This is likely due to having a channel_max setting
      on the rabbitmq broker (see https://www.rabbitmq.com/configure.html).
      There are currently #{@channels.size} channels on this connection.
    MSG
    raise ::MarchHare::ChannelError.new(error_message, false)
  end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#disable_automatic_recoveryObject



323
324
325
# File 'lib/march_hare/session.rb', line 323

def disable_automatic_recovery
  @connection.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end

#flushObject

Flushes the socket used by this connection.



389
390
391
# File 'lib/march_hare/session.rb', line 389

def flush
  @connection.flush
end

#heartbeat=(n) ⇒ Object



394
395
396
# File 'lib/march_hare/session.rb', line 394

def heartbeat=(n)
  @connection.heartbeat = n
end

#hostnameObject Also known as: host



420
421
422
# File 'lib/march_hare/session.rb', line 420

def hostname
  @cf.host
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler



265
266
267
268
269
270
# File 'lib/march_hare/session.rb', line 265

def on_blocked(&block)
  listener = BlockBlockedUnblockedListener.for_blocked(block)
  @blocked_connection_hooks << listener

  self.add_blocked_listener(listener)
end

#on_recovery(&block) ⇒ Object



296
297
298
299
# File 'lib/march_hare/session.rb', line 296

def on_recovery(&block)
  listener = RecoveryListener.for_finish(block)
  @connection_recovery_hooks << listener
end

#on_recovery_start(&block) ⇒ Object



291
292
293
294
# File 'lib/march_hare/session.rb', line 291

def on_recovery_start(&block)
  listener = RecoveryListener.for_start(block)
  @connection_recovery_hooks << listener
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



255
256
257
258
259
260
261
262
# File 'lib/march_hare/session.rb', line 255

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler



273
274
275
276
277
278
# File 'lib/march_hare/session.rb', line 273

def on_unblocked(&block)
  listener = BlockBlockedUnblockedListener.for_unblocked(block)
  @blocked_connection_hooks << listener

  self.add_blocked_listener(listener)
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise.

Returns:

  • (Boolean)

    true if connection is open, false otherwise



242
243
244
# File 'lib/march_hare/session.rb', line 242

def open?
  @connection.open?
end

#portObject



425
426
427
# File 'lib/march_hare/session.rb', line 425

def port
  @cf.port
end

#recover_connection_block_hooks(connection) ⇒ Object



381
382
383
384
385
386
# File 'lib/march_hare/session.rb', line 381

def recover_connection_block_hooks(connection)
  @logger.debug("session: recover_connection_block_hooks")
  @blocked_connection_hooks.each do |listener|
    connection.add_blocked_listener(listener)
  end
end

#recover_shutdown_hooks(connection) ⇒ Object



373
374
375
376
377
378
# File 'lib/march_hare/session.rb', line 373

def recover_shutdown_hooks(connection)
  @logger.debug("session: recover_shutdown_hooks")
  @shutdown_hooks.each do |sh|
    connection.add_shutdown_listener(sh)
  end
end

#register_channel(ch) ⇒ Object



455
456
457
# File 'lib/march_hare/session.rb', line 455

def register_channel(ch)
  @channels[ch.channel_number] = ch
end

#reopenObject



236
237
238
239
# File 'lib/march_hare/session.rb', line 236

def reopen
  @was_explicitly_closed = false
  automatically_recover
end

#should_initiate_connection_recovery?(signal) ⇒ Boolean

Returns:

  • (Boolean)


318
319
320
# File 'lib/march_hare/session.rb', line 318

def should_initiate_connection_recovery?(signal)
  !signal.initiated_by_application || signal.instance_of?(MissedHeartbeatException)
end

#startObject

No-op, exists for better API compatibility with Bunny.



399
400
401
402
403
404
405
406
407
408
409
# File 'lib/march_hare/session.rb', line 399

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
  self
end

#tls?Boolean Also known as: ssl?

Returns:

  • (Boolean)


429
430
431
432
433
434
435
436
437
# File 'lib/march_hare/session.rb', line 429

def tls?
  if @uses_uri
    u = java.net.URI.new(@uri.to_java_string)

    u.scheme == "amqps"
  else
    self.port == ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
  end
end

#to_sString

Returns:

  • (String)


445
446
447
# File 'lib/march_hare/session.rb', line 445

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#unregister_channel(ch) ⇒ Object



460
461
462
# File 'lib/march_hare/session.rb', line 460

def unregister_channel(ch)
  @channels.delete(ch.channel_number)
end

#usernameObject Also known as: user



411
412
413
# File 'lib/march_hare/session.rb', line 411

def username
  @cf.username
end

#vhostObject



416
417
418
# File 'lib/march_hare/session.rb', line 416

def vhost
  @cf.virtual_host
end