Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: BlockBlockedUnblockedListener, SSLContextException

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_factory, opts = {}) ⇒ Session

Returns a new instance of Session.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/march_hare/session.rb', line 121

def initialize(connection_factory, opts = {})
  @cf               = connection_factory
  @uri              = opts[:uri]
  @uses_uri         = !(@uri.nil?)
  # executors cannot be restarted after shutdown,
  # so we really need a factory here. MK.
  @executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)

  @hosts            = self.class.hosts_from(opts)
  @default_host_selection_strategy = lambda { |hosts| hosts.sample }
  @host_selection_strategy         = opts[:host_selection_strategy] || @default_host_selection_strategy

  @connection       = if @uses_uri
                        self.new_uri_connection_impl(@uri)
                      else
                        self.new_connection_impl(@hosts, @host_selection_strategy)
                      end
  @channels         = JavaConcurrent::ConcurrentHashMap.new

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @shutdown_hooks            = Array.new

  if @automatically_recover
    self.add_automatic_recovery_hook
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



345
346
347
# File 'lib/march_hare/session.rb', line 345

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection.

Returns:



117
118
119
# File 'lib/march_hare/session.rb', line 117

def channels
  @channels
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :requested_heartbeat (Integer) — default: 580

    Heartbeat timeout used. 0 means no heartbeat.

  • :tls (Boolean, String) — default: false

    Set to true to use TLS/SSL connection or TLS version name as a string, e.g. “TLSv1.1”. This will switch port to 5671 by default.

  • :tls_certificate_path (String)

    Path to a PKCS12 certificate.

  • :thread_factory (java.util.concurrent.ThreadFactory)

    Thread factory RabbitMQ Java client will use (useful in restricted PaaS platforms such as GAE)

See Also:



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/march_hare/session.rb', line 51

def self.connect(options = {})
  cf = ConnectionFactory.new

  if options[:uri]
    cf.uri          = options[:uri]          if options[:uri]
  else
    cf.host         = hostname_from(options) if include_host?(options)
    cf.port         = options[:port].to_i    if options[:port]
    cf.virtual_host = vhost_from(options)    if include_vhost?(options)
    cf.username     = username_from(options) if include_username?(options)
    cf.password     = password_from(options) if include_password?(options)
  end

  cf.connection_timeout  = timeout_from(options)  if include_timeout?(options)

  cf.requested_heartbeat = heartbeat_from(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  cf.thread_factory      = thread_factory_from(options)    if include_thread_factory?(options)
  cf.exception_handler   = exception_handler_from(options) if include_exception_handler?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    # TODO: logging
    $stdout.puts "Using TLS/SSL version #{tls}"
    if options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    elsif (cert_path = tls_certificate_path_from(options)) && (password = tls_certificate_password_from(options))
      ctx = SSLContext.get_instance(tls)
      pwd = password.to_java.to_char_array
      begin
        is = File.new(cert_path).to_inputstream
        ks = KeyStore.get_instance('PKCS12')
        ks.load(is, pwd)

        kmf = KeyManagerFactory.get_instance("SunX509")
        kmf.init(ks, pwd)

        ctx.init(kmf.get_key_managers, [NullTrustManager.new].to_java('javax.net.ssl.TrustManager'), nil)

        cf.use_ssl_protocol(ctx)
      rescue Java::JavaLang::Throwable => e
        message = e.message
        message << "\n"
        message << e.backtrace.join("\n")

        raise SSLContextException.new(message)
      ensure
        is.close if is
      end
    else
      cf.use_ssl_protocol(tls)
    end
  end

  new(cf, options)
end

Instance Method Details

#add_automatic_recovery_hookObject



229
230
231
232
233
234
235
236
237
# File 'lib/march_hare/session.rb', line 229

def add_automatic_recovery_hook
  fn = Proc.new do |_, signal|
    if !signal.initiated_by_application
      self.automatically_recover
    end
  end

  @automatic_recovery_hook = self.on_shutdown(&fn)
end

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/march_hare/session.rb', line 246

def automatically_recover
  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  new_connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) do
      if @uses_uri
        self.new_uri_connection_impl(@uri)
      else
        self.new_connection_impl(@hosts, @host_selection_strategy)
      end
    end
  end
  @thread_pool = ThreadPools.dynamically_growing
  self.recover_shutdown_hooks(new_connection)

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, new_connection)
    rescue Exception, java.io.IOException => e
      # TODO: logging
      $stderr.puts e
    end
  end

  @connection = new_connection
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.



223
224
225
# File 'lib/march_hare/session.rb', line 223

def clear_blocked_connection_callbacks
  @connection.clear_blocked_listeners
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



180
181
182
183
184
185
186
187
# File 'lib/march_hare/session.rb', line 180

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  maybe_shut_down_executor
  @connection.close
end

#closed?Boolean

Returns true if this channel is closed.

Returns:

  • (Boolean)

    true if this channel is closed



196
197
198
# File 'lib/march_hare/session.rb', line 196

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/march_hare/session.rb', line 162

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#disable_automatic_recoveryObject



240
241
242
# File 'lib/march_hare/session.rb', line 240

def disable_automatic_recovery
  @connetion.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end

#flushObject

Flushes the socket used by this connection.



295
296
297
# File 'lib/march_hare/session.rb', line 295

def flush
  @connection.flush
end

#heartbeat=(n) ⇒ Object



300
301
302
# File 'lib/march_hare/session.rb', line 300

def heartbeat=(n)
  @connection.heartbeat = n
end

#hostnameObject Also known as: host



325
326
327
# File 'lib/march_hare/session.rb', line 325

def hostname
  @cf.host
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler



213
214
215
# File 'lib/march_hare/session.rb', line 213

def on_blocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_blocked(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



203
204
205
206
207
208
209
210
# File 'lib/march_hare/session.rb', line 203

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler



218
219
220
# File 'lib/march_hare/session.rb', line 218

def on_unblocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_unblocked(block))
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise.

Returns:

  • (Boolean)

    true if connection is open, false otherwise



190
191
192
# File 'lib/march_hare/session.rb', line 190

def open?
  @connection.open?
end

#portObject



330
331
332
# File 'lib/march_hare/session.rb', line 330

def port
  @cf.port
end

#recover_shutdown_hooks(connection) ⇒ Object



288
289
290
291
292
# File 'lib/march_hare/session.rb', line 288

def recover_shutdown_hooks(connection)
  @shutdown_hooks.each do |sh|
    connection.add_shutdown_listener(sh)
  end
end

#register_channel(ch) ⇒ Object



360
361
362
# File 'lib/march_hare/session.rb', line 360

def register_channel(ch)
  @channels[ch.channel_number] = ch
end

#startObject

No-op, exists for better API compatibility with Bunny.



305
306
307
308
309
310
311
312
313
314
# File 'lib/march_hare/session.rb', line 305

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
end

#tls?Boolean Also known as: ssl?

Returns:

  • (Boolean)


334
335
336
337
338
339
340
341
342
# File 'lib/march_hare/session.rb', line 334

def tls?
  if @uses_uri
    u = java.net.URI.new(@uri.to_java_string)

    u.scheme == "amqps"
  else
    self.port == ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
  end
end

#to_sString

Returns:

  • (String)


350
351
352
# File 'lib/march_hare/session.rb', line 350

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#unregister_channel(ch) ⇒ Object



365
366
367
# File 'lib/march_hare/session.rb', line 365

def unregister_channel(ch)
  @channels.delete(ch.channel_number)
end

#usernameObject Also known as: user



316
317
318
# File 'lib/march_hare/session.rb', line 316

def username
  @cf.username
end

#vhostObject



321
322
323
# File 'lib/march_hare/session.rb', line 321

def vhost
  @cf.virtual_host
end