Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: BlockBlockedUnblockedListener, SSLContextException

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_factory, opts = {}) ⇒ Session

Returns a new instance of Session.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/march_hare/session.rb', line 139

def initialize(connection_factory, opts = {})
  @cf               = connection_factory

  # March Hare uses its own connection recovery implementation and
  # as of Java client 4.x automatic recovery is enabled by
  # default. MK.
  @cf.automatic_recovery_enabled = false
  @cf.topology_recovery_enabled  = false

  @uri              = opts[:uri]
  @uses_uri         = !(@uri.nil?)
  # executors cannot be restarted after shutdown,
  # so we really need a factory here. MK.
  @executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)
  # we expect this option to be specified in seconds
  @executor_shutdown_timeout = opts.fetch(:executor_shutdown_timeout, 30.0)

  @addresses        = self.class.adresses_from(opts)
  @connection       = build_new_connection
  @channels         = JavaConcurrent::ConcurrentHashMap.new

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @shutdown_hooks            = Array.new

  if @automatically_recover
    self.add_automatic_recovery_hook
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



372
373
374
# File 'lib/march_hare/session.rb', line 372

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection.

Returns:



135
136
137
# File 'lib/march_hare/session.rb', line 135

def channels
  @channels
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :executor_shutdown_timeout (Numeric) — default: 30.0

    when recovering from a network failure how long should we wait for the current threadpool to finish handling its messages

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :hosts (Array<String>) — default: ["127.0.0.1"]

    Array of hostnames or ips to connect to. The connection returned is the first in the array that succeeds.

  • :addresses (Array<String>) — default: ["127.0.0.1:5672", "localhost:5673"]

    Array of addresses to connect to. The connection returned is the first in the array that succeeds.

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :requested_heartbeat (Integer) — default: 580

    Heartbeat timeout used. 0 means no heartbeat.

  • :tls (Boolean, String) — default: false

    Set to true to use TLS/SSL connection or TLS version name as a string, e.g. “TLSv1.1”. This will switch port to 5671 by default.

  • :tls_certificate_path (String)

    Path to a PKCS12 certificate.

  • :thread_factory (java.util.concurrent.ThreadFactory)

    Thread factory RabbitMQ Java client will use (useful in restricted PaaS platforms such as GAE)

See Also:



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/march_hare/session.rb', line 57

def self.connect(options = {})
  cf = ConnectionFactory.new

  if options[:uri]
    cf.uri          = options[:uri]          if options[:uri]
  elsif options[:hosts] || options[:addresses]
    cf.virtual_host = vhost_from(options)    if include_vhost?(options)
    cf.username     = username_from(options) if include_username?(options)
    cf.password     = password_from(options) if include_password?(options)
  else
    cf.host         = hostname_from(options) if include_host?(options)
    cf.port         = options[:port].to_i    if options[:port]
    cf.virtual_host = vhost_from(options)    if include_vhost?(options)
    cf.username     = username_from(options) if include_username?(options)
    cf.password     = password_from(options) if include_password?(options)
  end

  cf.connection_timeout  = timeout_from(options)  if include_timeout?(options)

  cf.requested_heartbeat = heartbeat_from(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  cf.thread_factory      = thread_factory_from(options)    if include_thread_factory?(options)
  cf.exception_handler   = exception_handler_from(options) if include_exception_handler?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
    when String then
    # TODO: logging
    $stdout.puts "Using TLS/SSL version #{tls}"
    # Note: `options[:trust_manager] = com.rabbitmq.client.NullTrustManager.new` can be set to disable TLS verification.
   if (cert_path = tls_certificate_path_from(options)) && (password = tls_certificate_password_from(options))
      ctx = SSLContext.get_instance(tls)
      pwd = password.to_java.to_char_array
      begin
        is = File.new(cert_path).to_inputstream
        ks = KeyStore.get_instance('PKCS12')
        ks.load(is, pwd)

        kmf = KeyManagerFactory.get_instance("SunX509")
        kmf.init(ks, pwd)

        if options[:trust_manager]
          ctx.init(kmf.get_key_managers, [options[:trust_manager]], nil)
        else
          # use the key store as the trust store
          tmf = TrustManagerFactory.get_instance(TrustManagerFactory.getDefaultAlgorithm());
          tmf.init(ks)
          ctx.init(kmf.get_key_managers, tmf.getTrustManagers(), nil)
        end

        cf.use_ssl_protocol(ctx)
      rescue Java::JavaLang::Throwable => e
        message = e.message
        message << "\n"
        message << e.backtrace.join("\n")

        raise SSLContextException.new(message)
      ensure
        is.close if is
      end
    elsif options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    else
      cf.use_ssl_protocol(tls)
    end
  end

  new(cf, options)
end

Instance Method Details

#add_automatic_recovery_hookObject



257
258
259
260
261
262
263
264
265
# File 'lib/march_hare/session.rb', line 257

def add_automatic_recovery_hook
  fn = Proc.new do |_, signal|
    if should_initiate_connection_recovery?(signal)
      self.automatically_recover
    end
  end

  @automatic_recovery_hook = self.on_shutdown(&fn)
end

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/march_hare/session.rb', line 279

def automatically_recover
  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  new_connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) { build_new_connection }
  end
  self.recover_shutdown_hooks(new_connection)

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, new_connection)
    rescue Exception, java.io.IOException => e
      # TODO: logging
      $stderr.puts e
    end
  end

  @connection = new_connection
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.



251
252
253
# File 'lib/march_hare/session.rb', line 251

def clear_blocked_connection_callbacks
  @connection.clear_blocked_listeners
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



208
209
210
211
212
213
214
215
# File 'lib/march_hare/session.rb', line 208

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  maybe_shut_down_executor
  @connection.close
end

#closed?Boolean

Returns true if this channel is closed.

Returns:

  • (Boolean)

    true if this channel is closed



224
225
226
# File 'lib/march_hare/session.rb', line 224

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/march_hare/session.rb', line 182

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end
  if jc.nil?
    error_message = <<-MSG
      Unable to create a channel. This is likely due to having a channel_max setting
      on the rabbitmq broker (see https://www.rabbitmq.com/configure.html).
      There are currently #{@channels.size} channels on this connection.
    MSG
    raise ::MarchHare::ChannelError.new(error_message, false)
  end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#disable_automatic_recoveryObject



273
274
275
# File 'lib/march_hare/session.rb', line 273

def disable_automatic_recovery
  @connection.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end

#flushObject

Flushes the socket used by this connection.



321
322
323
# File 'lib/march_hare/session.rb', line 321

def flush
  @connection.flush
end

#heartbeat=(n) ⇒ Object



326
327
328
# File 'lib/march_hare/session.rb', line 326

def heartbeat=(n)
  @connection.heartbeat = n
end

#hostnameObject Also known as: host



352
353
354
# File 'lib/march_hare/session.rb', line 352

def hostname
  @cf.host
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler



241
242
243
# File 'lib/march_hare/session.rb', line 241

def on_blocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_blocked(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



231
232
233
234
235
236
237
238
# File 'lib/march_hare/session.rb', line 231

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler



246
247
248
# File 'lib/march_hare/session.rb', line 246

def on_unblocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_unblocked(block))
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise.

Returns:

  • (Boolean)

    true if connection is open, false otherwise



218
219
220
# File 'lib/march_hare/session.rb', line 218

def open?
  @connection.open?
end

#portObject



357
358
359
# File 'lib/march_hare/session.rb', line 357

def port
  @cf.port
end

#recover_shutdown_hooks(connection) ⇒ Object



314
315
316
317
318
# File 'lib/march_hare/session.rb', line 314

def recover_shutdown_hooks(connection)
  @shutdown_hooks.each do |sh|
    connection.add_shutdown_listener(sh)
  end
end

#register_channel(ch) ⇒ Object



387
388
389
# File 'lib/march_hare/session.rb', line 387

def register_channel(ch)
  @channels[ch.channel_number] = ch
end

#should_initiate_connection_recovery?(signal) ⇒ Boolean

Returns:

  • (Boolean)


268
269
270
# File 'lib/march_hare/session.rb', line 268

def should_initiate_connection_recovery?(signal)
  !signal.initiated_by_application || signal.instance_of?(MissedHeartbeatException)
end

#startObject

No-op, exists for better API compatibility with Bunny.



331
332
333
334
335
336
337
338
339
340
341
# File 'lib/march_hare/session.rb', line 331

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
  self
end

#tls?Boolean Also known as: ssl?

Returns:

  • (Boolean)


361
362
363
364
365
366
367
368
369
# File 'lib/march_hare/session.rb', line 361

def tls?
  if @uses_uri
    u = java.net.URI.new(@uri.to_java_string)

    u.scheme == "amqps"
  else
    self.port == ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
  end
end

#to_sString

Returns:

  • (String)


377
378
379
# File 'lib/march_hare/session.rb', line 377

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#unregister_channel(ch) ⇒ Object



392
393
394
# File 'lib/march_hare/session.rb', line 392

def unregister_channel(ch)
  @channels.delete(ch.channel_number)
end

#usernameObject Also known as: user



343
344
345
# File 'lib/march_hare/session.rb', line 343

def username
  @cf.username
end

#vhostObject



348
349
350
# File 'lib/march_hare/session.rb', line 348

def vhost
  @cf.virtual_host
end