Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: BlockBlockedUnblockedListener, SSLContextException

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_factory, opts = {}) ⇒ Session

Returns a new instance of Session.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/march_hare/session.rb', line 117

def initialize(connection_factory, opts = {})
  @cf               = connection_factory
  # executors cannot be restarted after shutdown,
  # so we really need a factory here. MK.
  @executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)

  @hosts            = self.class.hosts_from(opts)
  @default_host_selection_strategy = lambda { |hosts| hosts.sample }
  @host_selection_strategy         = opts[:host_selection_strategy] || @default_host_selection_strategy

  @connection       = self.new_connection_impl(@hosts, @host_selection_strategy)
  @channels         = JavaConcurrent::ConcurrentHashMap.new

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @shutdown_hooks            = Array.new

  if @automatically_recover
    self.add_automatic_recovery_hook
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



302
303
304
# File 'lib/march_hare/session.rb', line 302

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection.

Returns:



113
114
115
# File 'lib/march_hare/session.rb', line 113

def channels
  @channels
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :requested_heartbeat (Integer) — default: 580

    Heartbeat timeout used. 0 means no heartbeat.

  • :tls (Boolean) — default: false

    Set to true to use TLS/SSL connection. This will switch port to 5671 by default.

  • :thread_factory (java.util.concurrent.ThreadFactory)

    Thread factory RabbitMQ Java client will use (useful in restricted PaaS platforms such as GAE)

See Also:



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/march_hare/session.rb', line 48

def self.connect(options={})
  cf = ConnectionFactory.new

  cf.uri                = options[:uri]          if options[:uri]
  cf.host               = hostname_from(options) if include_host?(options)
  cf.port               = options[:port].to_i    if options[:port]
  cf.virtual_host       = vhost_from(options)    if include_vhost?(options)
  cf.connection_timeout = timeout_from(options)  if include_timeout?(options)
  cf.username           = username_from(options) if include_username?(options)
  cf.password           = password_from(options) if include_password?(options)

  cf.requested_heartbeat = heartbeat_from(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  cf.thread_factory      = thread_factory_from(options)    if include_thread_factory?(options)
  cf.exception_handler   = exception_handler_from(options) if include_exception_handler?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    if options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    elsif (key_cert = tls_key_certificate_path_from(options)) && (key_cert_password = tls_key_certificate_password_from(options))
      sslContext = SSLContext.get_instance(tls)
      certificate_password = key_cert_password.to_java.to_char_array
      begin
        input_stream = File.new(key_cert).to_inputstream
        key_store = KeyStore.get_instance('PKCS12')
        key_store.load(input_stream, certificate_password)

        key_manager_factory = KeyManagerFactory.get_instance("SunX509")
        key_manager_factory.init(key_store, certificate_password)

        key_managers =  key_manager_factory.get_key_managers

        sslContext.init(key_managers, [NullTrustManager.new].to_java('javax.net.ssl.TrustManager'), nil)

        cf.use_ssl_protocol(sslContext)
      # catch all exceptions from java
      rescue Java::JavaLang::Exception => e
        message = e.message
        message << "\n"
        message << e.backtrace.join("\n")

        raise SSLContextException.new(message)
      ensure
        input_stream.close if input_stream
      end  
    else
      cf.use_ssl_protocol(tls)
    end
  end


  new(cf, options)
end

Instance Method Details

#add_automatic_recovery_hookObject



219
220
221
222
223
224
225
226
227
# File 'lib/march_hare/session.rb', line 219

def add_automatic_recovery_hook
  fn = Proc.new do |_, signal|
    if !signal.initiated_by_application
      self.automatically_recover
    end
  end

  @automatic_recovery_hook = self.on_shutdown(&fn)
end

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/march_hare/session.rb', line 236

def automatically_recover
  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  new_connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) do
      self.new_connection_impl(@hosts, @host_selection_strategy)
    end
  end
  @thread_pool = ThreadPools.dynamically_growing
  self.recover_shutdown_hooks(new_connection)

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, new_connection)
    rescue Exception, java.io.IOException => e
      # TODO: logging
      $stderr.puts e
    end
  end

  @connection = new_connection
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.



213
214
215
# File 'lib/march_hare/session.rb', line 213

def clear_blocked_connection_callbacks
  @connection.clear_blocked_listeners
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



170
171
172
173
174
175
176
177
# File 'lib/march_hare/session.rb', line 170

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  maybe_shut_down_executor
  @connection.close
end

#closed?Boolean

Returns true if this channel is closed.

Returns:

  • (Boolean)

    true if this channel is closed



186
187
188
# File 'lib/march_hare/session.rb', line 186

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/march_hare/session.rb', line 152

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#disable_automatic_recoveryObject



230
231
232
# File 'lib/march_hare/session.rb', line 230

def disable_automatic_recovery
  @connetion.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end

#flushObject

Flushes the socket used by this connection.



281
282
283
# File 'lib/march_hare/session.rb', line 281

def flush
  @connection.flush
end

#heartbeat=(n) ⇒ Object



286
287
288
# File 'lib/march_hare/session.rb', line 286

def heartbeat=(n)
  @connection.heartbeat = n
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler



203
204
205
# File 'lib/march_hare/session.rb', line 203

def on_blocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_blocked(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



193
194
195
196
197
198
199
200
# File 'lib/march_hare/session.rb', line 193

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler



208
209
210
# File 'lib/march_hare/session.rb', line 208

def on_unblocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_unblocked(block))
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise.

Returns:

  • (Boolean)

    true if connection is open, false otherwise



180
181
182
# File 'lib/march_hare/session.rb', line 180

def open?
  @connection.open?
end

#recover_shutdown_hooks(connection) ⇒ Object



274
275
276
277
278
# File 'lib/march_hare/session.rb', line 274

def recover_shutdown_hooks(connection)
  @shutdown_hooks.each do |sh|
    connection.add_shutdown_listener(sh)
  end
end

#register_channel(ch) ⇒ Object



317
318
319
# File 'lib/march_hare/session.rb', line 317

def register_channel(ch)
  @channels[ch.channel_number] = ch
end

#startObject

No-op, exists for better API compatibility with Bunny.



291
292
293
294
295
296
297
298
299
300
# File 'lib/march_hare/session.rb', line 291

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
end

#to_sString

Returns:

  • (String)


307
308
309
# File 'lib/march_hare/session.rb', line 307

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#unregister_channel(ch) ⇒ Object



322
323
324
# File 'lib/march_hare/session.rb', line 322

def unregister_channel(ch)
  @channels.delete(ch.channel_number)
end