Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: BlockBlockedUnblockedListener, SSLContextException

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_factory, opts = {}) ⇒ Session

Returns a new instance of Session.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/march_hare/session.rb', line 123

def initialize(connection_factory, opts = {})
  @cf               = connection_factory

  # March Hare uses its own connection recovery implementation and
  # as of Java client 4.x automatic recovery is enabled by
  # default. MK.
  @cf.automatic_recovery_enabled = false
  @cf.topology_recovery_enabled  = false

  @uri              = opts[:uri]
  @uses_uri         = !(@uri.nil?)
  # executors cannot be restarted after shutdown,
  # so we really need a factory here. MK.
  @executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)
  # we expect this option to be specified in seconds
  @executor_shutdown_timeout = opts.fetch(:executor_shutdown_timeout, 30.0)

  @hosts            = self.class.hosts_from(opts)
  @default_host_selection_strategy = lambda { |hosts| hosts.sample }
  @host_selection_strategy         = opts[:host_selection_strategy] || @default_host_selection_strategy

  @connection       = if @uses_uri
                        self.new_uri_connection_impl(@uri)
                      else
                        self.new_connection_impl(@hosts, @host_selection_strategy)
                      end
  @channels         = JavaConcurrent::ConcurrentHashMap.new

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @shutdown_hooks            = Array.new

  if @automatically_recover
    self.add_automatic_recovery_hook
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



369
370
371
# File 'lib/march_hare/session.rb', line 369

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection.

Returns:



119
120
121
# File 'lib/march_hare/session.rb', line 119

def channels
  @channels
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :executor_shutdown_timeout (Numeric) — default: 30.0

    when recovering from a network failure how long should we wait for the current threadpool to finish handling its messages

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :requested_heartbeat (Integer) — default: 580

    Heartbeat timeout used. 0 means no heartbeat.

  • :tls (Boolean, String) — default: false

    Set to true to use TLS/SSL connection or TLS version name as a string, e.g. “TLSv1.1”. This will switch port to 5671 by default.

  • :tls_certificate_path (String)

    Path to a PKCS12 certificate.

  • :thread_factory (java.util.concurrent.ThreadFactory)

    Thread factory RabbitMQ Java client will use (useful in restricted PaaS platforms such as GAE)

See Also:



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/march_hare/session.rb', line 53

def self.connect(options = {})
  cf = ConnectionFactory.new

  if options[:uri]
    cf.uri          = options[:uri]          if options[:uri]
  else
    cf.host         = hostname_from(options) if include_host?(options)
    cf.port         = options[:port].to_i    if options[:port]
    cf.virtual_host = vhost_from(options)    if include_vhost?(options)
    cf.username     = username_from(options) if include_username?(options)
    cf.password     = password_from(options) if include_password?(options)
  end

  cf.connection_timeout  = timeout_from(options)  if include_timeout?(options)

  cf.requested_heartbeat = heartbeat_from(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  cf.thread_factory      = thread_factory_from(options)    if include_thread_factory?(options)
  cf.exception_handler   = exception_handler_from(options) if include_exception_handler?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    # TODO: logging
    $stdout.puts "Using TLS/SSL version #{tls}"
    if options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    elsif (cert_path = tls_certificate_path_from(options)) && (password = tls_certificate_password_from(options))
      ctx = SSLContext.get_instance(tls)
      pwd = password.to_java.to_char_array
      begin
        is = File.new(cert_path).to_inputstream
        ks = KeyStore.get_instance('PKCS12')
        ks.load(is, pwd)

        kmf = KeyManagerFactory.get_instance("SunX509")
        kmf.init(ks, pwd)

        ctx.init(kmf.get_key_managers, [NullTrustManager.new].to_java('javax.net.ssl.TrustManager'), nil)

        cf.use_ssl_protocol(ctx)
      rescue Java::JavaLang::Throwable => e
        message = e.message
        message << "\n"
        message << e.backtrace.join("\n")

        raise SSLContextException.new(message)
      ensure
        is.close if is
      end
    else
      cf.use_ssl_protocol(tls)
    end
  end

  new(cf, options)
end

Instance Method Details

#add_automatic_recovery_hookObject



248
249
250
251
252
253
254
255
256
# File 'lib/march_hare/session.rb', line 248

def add_automatic_recovery_hook
  fn = Proc.new do |_, signal|
    if should_initiate_connection_recovery?(signal)
      self.automatically_recover
    end
  end

  @automatic_recovery_hook = self.on_shutdown(&fn)
end

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/march_hare/session.rb', line 270

def automatically_recover
  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  new_connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) do
      if @uses_uri
        self.new_uri_connection_impl(@uri)
      else
        self.new_connection_impl(@hosts, @host_selection_strategy)
      end
    end
  end
  self.recover_shutdown_hooks(new_connection)

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, new_connection)
    rescue Exception, java.io.IOException => e
      # TODO: logging
      $stderr.puts e
    end
  end

  @connection = new_connection
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.



242
243
244
# File 'lib/march_hare/session.rb', line 242

def clear_blocked_connection_callbacks
  @connection.clear_blocked_listeners
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



199
200
201
202
203
204
205
206
# File 'lib/march_hare/session.rb', line 199

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  maybe_shut_down_executor
  @connection.close
end

#closed?Boolean

Returns true if this channel is closed.

Returns:

  • (Boolean)

    true if this channel is closed



215
216
217
# File 'lib/march_hare/session.rb', line 215

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/march_hare/session.rb', line 173

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end
  if jc.nil?
    error_message = <<-MSG
      Unable to create a channel. This is likely due to having a channel_max setting
      on the rabbitmq broker (see https://www.rabbitmq.com/configure.html).
      There are currently #{@channels.size} channels on this connection.
    MSG
    raise ::MarchHare::ChannelError.new(error_message, false)
  end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#disable_automatic_recoveryObject



264
265
266
# File 'lib/march_hare/session.rb', line 264

def disable_automatic_recovery
  @connection.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end

#flushObject

Flushes the socket used by this connection.



318
319
320
# File 'lib/march_hare/session.rb', line 318

def flush
  @connection.flush
end

#heartbeat=(n) ⇒ Object



323
324
325
# File 'lib/march_hare/session.rb', line 323

def heartbeat=(n)
  @connection.heartbeat = n
end

#hostnameObject Also known as: host



349
350
351
# File 'lib/march_hare/session.rb', line 349

def hostname
  @cf.host
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler



232
233
234
# File 'lib/march_hare/session.rb', line 232

def on_blocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_blocked(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



222
223
224
225
226
227
228
229
# File 'lib/march_hare/session.rb', line 222

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler



237
238
239
# File 'lib/march_hare/session.rb', line 237

def on_unblocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_unblocked(block))
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise.

Returns:

  • (Boolean)

    true if connection is open, false otherwise



209
210
211
# File 'lib/march_hare/session.rb', line 209

def open?
  @connection.open?
end

#portObject



354
355
356
# File 'lib/march_hare/session.rb', line 354

def port
  @cf.port
end

#recover_shutdown_hooks(connection) ⇒ Object



311
312
313
314
315
# File 'lib/march_hare/session.rb', line 311

def recover_shutdown_hooks(connection)
  @shutdown_hooks.each do |sh|
    connection.add_shutdown_listener(sh)
  end
end

#register_channel(ch) ⇒ Object



384
385
386
# File 'lib/march_hare/session.rb', line 384

def register_channel(ch)
  @channels[ch.channel_number] = ch
end

#should_initiate_connection_recovery?(signal) ⇒ Boolean

Returns:

  • (Boolean)


259
260
261
# File 'lib/march_hare/session.rb', line 259

def should_initiate_connection_recovery?(signal)
  !signal.initiated_by_application || signal.instance_of?(MissedHeartbeatException)
end

#startObject

No-op, exists for better API compatibility with Bunny.



328
329
330
331
332
333
334
335
336
337
338
# File 'lib/march_hare/session.rb', line 328

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
  self
end

#tls?Boolean Also known as: ssl?

Returns:

  • (Boolean)


358
359
360
361
362
363
364
365
366
# File 'lib/march_hare/session.rb', line 358

def tls?
  if @uses_uri
    u = java.net.URI.new(@uri.to_java_string)

    u.scheme == "amqps"
  else
    self.port == ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
  end
end

#to_sString

Returns:

  • (String)


374
375
376
# File 'lib/march_hare/session.rb', line 374

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#unregister_channel(ch) ⇒ Object



389
390
391
# File 'lib/march_hare/session.rb', line 389

def unregister_channel(ch)
  @channels.delete(ch.channel_number)
end

#usernameObject Also known as: user



340
341
342
# File 'lib/march_hare/session.rb', line 340

def username
  @cf.username
end

#vhostObject



345
346
347
# File 'lib/march_hare/session.rb', line 345

def vhost
  @cf.virtual_host
end