Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: BlockBlockedUnblockedListener, RecoveryListener, SSLContextException

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_factory, opts = {}) ⇒ Session

Returns a new instance of Session.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/march_hare/session.rb', line 146

def initialize(connection_factory, opts = {})
  @cf               = connection_factory

  log_file                   = opts[:log_file] || STDOUT
  log_level                  = opts[:log_level] || ENV["MARCH_HARE_LOG_LEVEL"] || Logger::WARN
  @logger                    = opts.fetch(:logger, init_default_logger(log_file, log_level))
  @cf.exception_handler      = opts.fetch(:exception_handler, init_default_exception_handler(@logger))

  # March Hare uses its own connection recovery implementation and
  # as of Java client 4.x automatic recovery is enabled by
  # default. MK.
  @cf.automatic_recovery_enabled = false
  @cf.topology_recovery_enabled  = false

  @uri              = opts[:uri]
  @uses_uri         = !(@uri.nil?)
  # executors cannot be restarted after shutdown,
  # so we really need a factory here. MK.
  @executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)
  # we expect this option to be specified in seconds
  @executor_shutdown_timeout = opts.fetch(:executor_shutdown_timeout, 30.0)

  @addresses        = self.class.addresses_from(opts)
  @connection       = build_new_connection
  @channels         = JavaConcurrent::ConcurrentHashMap.new

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @shutdown_hooks            = Array.new
  @blocked_connection_hooks  = Array.new
  @connection_recovery_hooks = Array.new

  if @automatically_recover
    self.add_automatic_recovery_hook
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



429
430
431
# File 'lib/march_hare/session.rb', line 429

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection.

Returns:



139
140
141
# File 'lib/march_hare/session.rb', line 139

def channels
  @channels
end

#logger::Logger (readonly)

Returns Logger instance.

Returns:

  • (::Logger)

    Logger instance



142
143
144
# File 'lib/march_hare/session.rb', line 142

def logger
  @logger
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :executor_shutdown_timeout (Numeric) — default: 30.0

    when recovering from a network failure how long should we wait for the current threadpool to finish handling its messages

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :hosts (Array<String>) — default: ["127.0.0.1"]

    Array of hostnames or ips to connect to. The connection returned is the first in the array that succeeds.

  • :addresses (Array<String>) — default: ["127.0.0.1:5672", "localhost:5673"]

    Array of addresses to connect to. The connection returned is the first in the array that succeeds.

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :requested_heartbeat (Integer) — default: 580

    Heartbeat timeout used. 0 means no heartbeat.

  • :tls (Boolean, String) — default: false

    Set to true to use TLS/SSL connection or TLS version name as a string, e.g. “TLSv1.1”. This will switch port to 5671 by default.

  • :tls_certificate_path (String)

    Path to a PKCS12 certificate.

  • :thread_factory (java.util.concurrent.ThreadFactory)

    Thread factory RabbitMQ Java client will use (useful in restricted PaaS platforms such as GAE)

  • :logger (Logger)

    The logger. If missing, one is created using :log_file and :log_level.

  • :log_file (IO, String)

    The file or path to use when creating a logger. Defaults to STDOUT.

  • :log_level (Integer)

    The log level to use when creating a logger. Defaults to LOGGER::WARN

See Also:



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/march_hare/session.rb', line 62

def self.connect(options = {})
  cf = ConnectionFactory.new

  if options[:uri]
    cf.uri          = options[:uri]          if options[:uri]
  elsif options[:hosts] || options[:addresses]
    cf.virtual_host = vhost_from(options)    if include_vhost?(options)
    cf.username     = username_from(options) if include_username?(options)
    cf.password     = password_from(options) if include_password?(options)
  else
    cf.host         = hostname_from(options) if include_host?(options)
    cf.port         = options[:port].to_i    if options[:port]
    cf.virtual_host = vhost_from(options)    if include_vhost?(options)
    cf.username     = username_from(options) if include_username?(options)
    cf.password     = password_from(options) if include_password?(options)
  end

  cf.connection_timeout  = timeout_from(options)  if include_timeout?(options)

  cf.requested_heartbeat = heartbeat_from(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  cf.thread_factory      = thread_factory_from(options)    if include_thread_factory?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    opts[:logger].info("Using TLS/SSL version #{tls}") if opts[:logger]
    # Note: `options[:trust_manager] = com.rabbitmq.client.TrustEverythingTrustManager.new`
    # can be set to effectively disable TLS verification.
    if (cert_path = tls_certificate_path_from(options)) && (password = tls_certificate_password_from(options))
      ctx = SSLContext.get_instance(tls)
      pwd = password.to_java.to_char_array
      begin
        is = File.new(cert_path).to_inputstream
        ks = KeyStore.get_instance('PKCS12')
        ks.load(is, pwd)

        kmf = KeyManagerFactory.get_instance("SunX509")
        kmf.init(ks, pwd)

        if options[:trust_manager]
          ctx.init(kmf.get_key_managers, [options[:trust_manager]], nil)
        else
          # use the key store as the trust store
          tmf = TrustManagerFactory.get_instance(TrustManagerFactory.getDefaultAlgorithm());
          tmf.init(ks)
          ctx.init(kmf.get_key_managers, tmf.getTrustManagers(), nil)
        end

        cf.use_ssl_protocol(ctx)
      rescue Java::JavaLang::Throwable => e
        message = e.message
        message << "\n"
        message << e.backtrace.join("\n")

        raise SSLContextException.new(message)
      ensure
        is.close if is
      end
    elsif options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    else
      cf.use_ssl_protocol(tls)
    end
  end

  new(cf, options)
end

Instance Method Details

#add_automatic_recovery_hookObject



297
298
299
300
301
302
303
304
305
# File 'lib/march_hare/session.rb', line 297

def add_automatic_recovery_hook
  fn = Proc.new do |_, signal|
    if should_initiate_connection_recovery?(signal)
      self.automatically_recover
    end
  end

  @automatic_recovery_hook = self.on_shutdown(&fn)
end

#add_blocked_listener(listener) ⇒ Object



270
271
272
# File 'lib/march_hare/session.rb', line 270

def add_blocked_listener(listener)
  @connection.add_blocked_listener(listener)
end

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/march_hare/session.rb', line 319

def automatically_recover
  @logger.debug("session: begin automatic connection recovery")

  fire_recovery_start_hooks

  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  new_connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) { build_new_connection }
  end
  self.recover_shutdown_hooks(new_connection)
  self.recover_connection_block_hooks(new_connection)

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, new_connection)
    rescue Exception, java.io.IOException => e
      @logger.error(e)
    end
  end

  @connection = new_connection

  fire_recovery_hooks

  @connection
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.



275
276
277
278
279
# File 'lib/march_hare/session.rb', line 275

def clear_blocked_connection_callbacks
  @blocked_connection_hooks.clear

  @connection.clear_blocked_listeners
end

#clear_connection_recovery_callbacksObject

Clears all callbacks defined with #on_recovery_started and #on_recovery



292
293
294
# File 'lib/march_hare/session.rb', line 292

def clear_connection_recovery_callbacks
  @connection_recovery_hooks.clear
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



222
223
224
225
226
227
228
229
# File 'lib/march_hare/session.rb', line 222

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  maybe_shut_down_executor
  @connection.close
end

#closed?Boolean

Returns true if this channel is closed.

Returns:

  • (Boolean)

    true if this channel is closed



238
239
240
# File 'lib/march_hare/session.rb', line 238

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/march_hare/session.rb', line 196

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end
  if jc.nil?
    error_message = <<-MSG
      Unable to create a channel. This is likely due to having a channel_max setting
      on the rabbitmq broker (see https://www.rabbitmq.com/configure.html).
      There are currently #{@channels.size} channels on this connection.
    MSG
    raise ::MarchHare::ChannelError.new(error_message, false)
  end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#disable_automatic_recoveryObject



313
314
315
# File 'lib/march_hare/session.rb', line 313

def disable_automatic_recovery
  @connection.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end

#flushObject

Flushes the socket used by this connection.



378
379
380
# File 'lib/march_hare/session.rb', line 378

def flush
  @connection.flush
end

#heartbeat=(n) ⇒ Object



383
384
385
# File 'lib/march_hare/session.rb', line 383

def heartbeat=(n)
  @connection.heartbeat = n
end

#hostnameObject Also known as: host



409
410
411
# File 'lib/march_hare/session.rb', line 409

def hostname
  @cf.host
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler



255
256
257
258
259
260
# File 'lib/march_hare/session.rb', line 255

def on_blocked(&block)
  listener = BlockBlockedUnblockedListener.for_blocked(block)
  @blocked_connection_hooks << listener

  self.add_blocked_listener(listener)
end

#on_recovery(&block) ⇒ Object



286
287
288
289
# File 'lib/march_hare/session.rb', line 286

def on_recovery(&block)
  listener = RecoveryListener.for_finish(block)
  @connection_recovery_hooks << listener
end

#on_recovery_start(&block) ⇒ Object



281
282
283
284
# File 'lib/march_hare/session.rb', line 281

def on_recovery_start(&block)
  listener = RecoveryListener.for_start(block)
  @connection_recovery_hooks << listener
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



245
246
247
248
249
250
251
252
# File 'lib/march_hare/session.rb', line 245

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler



263
264
265
266
267
268
# File 'lib/march_hare/session.rb', line 263

def on_unblocked(&block)
  listener = BlockBlockedUnblockedListener.for_unblocked(block)
  @blocked_connection_hooks << listener

  self.add_blocked_listener(listener)
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise.

Returns:

  • (Boolean)

    true if connection is open, false otherwise



232
233
234
# File 'lib/march_hare/session.rb', line 232

def open?
  @connection.open?
end

#portObject



414
415
416
# File 'lib/march_hare/session.rb', line 414

def port
  @cf.port
end

#recover_connection_block_hooks(connection) ⇒ Object



370
371
372
373
374
375
# File 'lib/march_hare/session.rb', line 370

def recover_connection_block_hooks(connection)
  @logger.debug("session: recover_connection_block_hooks")
  @blocked_connection_hooks.each do |listener|
    connection.add_blocked_listener(listener)
  end
end

#recover_shutdown_hooks(connection) ⇒ Object



362
363
364
365
366
367
# File 'lib/march_hare/session.rb', line 362

def recover_shutdown_hooks(connection)
  @logger.debug("session: recover_shutdown_hooks")
  @shutdown_hooks.each do |sh|
    connection.add_shutdown_listener(sh)
  end
end

#register_channel(ch) ⇒ Object



444
445
446
# File 'lib/march_hare/session.rb', line 444

def register_channel(ch)
  @channels[ch.channel_number] = ch
end

#should_initiate_connection_recovery?(signal) ⇒ Boolean

Returns:

  • (Boolean)


308
309
310
# File 'lib/march_hare/session.rb', line 308

def should_initiate_connection_recovery?(signal)
  !signal.initiated_by_application || signal.instance_of?(MissedHeartbeatException)
end

#startObject

No-op, exists for better API compatibility with Bunny.



388
389
390
391
392
393
394
395
396
397
398
# File 'lib/march_hare/session.rb', line 388

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
  self
end

#tls?Boolean Also known as: ssl?

Returns:

  • (Boolean)


418
419
420
421
422
423
424
425
426
# File 'lib/march_hare/session.rb', line 418

def tls?
  if @uses_uri
    u = java.net.URI.new(@uri.to_java_string)

    u.scheme == "amqps"
  else
    self.port == ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
  end
end

#to_sString

Returns:

  • (String)


434
435
436
# File 'lib/march_hare/session.rb', line 434

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#unregister_channel(ch) ⇒ Object



449
450
451
# File 'lib/march_hare/session.rb', line 449

def unregister_channel(ch)
  @channels.delete(ch.channel_number)
end

#usernameObject Also known as: user



400
401
402
# File 'lib/march_hare/session.rb', line 400

def username
  @cf.username
end

#vhostObject



405
406
407
# File 'lib/march_hare/session.rb', line 405

def vhost
  @cf.virtual_host
end