Method: Bunny::Session#initialize

Defined in:
lib/bunny/session.rb

#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session

Returns a new instance of Session.

Parameters:

  • connection_string_or_opts (String, Hash) (defaults to: )

    Connection string or a hash of connection options

  • optz (Hash) (defaults to: Hash.new)

    Extra options not related to connection

Options Hash (connection_string_or_opts):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :hosts (Array<String>) — default: ["127.0.0.1"]

    list of hostname or IP addresses to select hostname from when connecting

  • :addresses (Array<String>) — default: ["127.0.0.1:5672"]

    list of addresses to select hostname and port from when connecting

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :heartbeat (Integer, Symbol) — default: :server

    Heartbeat timeout to offer to the server. :server means use the value suggested by RabbitMQ. 0 means heartbeats and socket read timeouts will be disabled (not recommended).

  • :network_recovery_interval (Integer) — default: 4

    Recovery interval periodic network recovery will use. This includes initial pause after network failure.

  • :tls (Boolean) — default: false

    Should TLS/SSL be used?

  • :tls_cert (String) — default: nil

    Path to client TLS/SSL certificate file (.pem)

  • :tls_key (String) — default: nil

    Path to client TLS/SSL private key file (.pem)

  • :tls_ca_certificates (Array<String>)

    Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration

  • :verify_peer (String) — default: true

    Whether TLS peer verification should be performed

  • :tls_protocol (Symbol) — default: negotiated

    What TLS version should be used (:TLSv1, :TLSv1_1, or :TLSv1_2)

  • :channel_max (Integer) — default: 2047

    Maximum number of channels allowed on this connection, minus 1 to account for the special channel 0.

  • :continuation_timeout (Integer) — default: 15000

    Timeout for client operations that expect a response (e.g. Queue#get), in milliseconds.

  • :connection_timeout (Integer) — default: 30

    Timeout in seconds for connecting to the server.

  • :read_timeout (Integer) — default: 30

    TCP socket read timeout in seconds. If heartbeats are disabled this will be ignored.

  • :write_timeout (Integer) — default: 30

    TCP socket write timeout in seconds.

  • :hosts_shuffle_strategy (Proc)

    a callable that reorders a list of host strings, defaults to Array#shuffle

  • :recovery_completed (Proc)

    a callable that will be called when a network recovery is performed

  • :logger (Logger)

    The logger. If missing, one is created using :log_file and :log_level.

  • :log_file (IO, String)

    The file or path to use when creating a logger. Defaults to STDOUT.

  • :logfile (IO, String)

    DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.

  • :log_level (Integer)

    The log level to use when creating a logger. Defaults to LOGGER::WARN

  • :automatically_recover (Boolean) — default: true

    Should automatically recover from network failures?

  • :recovery_attempts (Integer) — default: nil

    Max number of recovery attempts, nil means forever

  • :reset_recovery_attempts_after_reconnection (Integer) — default: true

    Should recovery attempt counter be reset after successful reconnection? When set to false, the attempt counter will last through the entire lifetime of the connection object.

  • :recovery_attempt_started (Proc) — default: nil

    Will be called before every connection recovery attempt

  • :recovery_completed (Proc) — default: nil

    Will be called after successful connection recovery

  • :recovery_attempts_exhausted (Proc) — default: nil

    Will be called when the connection recovery failed after the specified amount of recovery attempts

  • :recover_from_connection_close (Boolean) — default: true

    Should this connection recover after receiving a server-sent connection.close (e.g. connection was force closed)?

  • :session_error_handler (Object) — default: Thread.current

    Object which responds to #raise that will act as a session error handler. Defaults to Thread.current, which will raise asynchronous exceptions in the thread that created the session.

  • :topology_recovery_filter (Bunny::TopologyRecoveryFilter)

    if provided, will be used for object filtering during topology recovery

Options Hash (optz):

  • :auth_mechanism (String) — default: "PLAIN"

    Authentication mechanism, PLAIN or EXTERNAL

  • :locale (String) — default: "PLAIN"

    Locale RabbitMQ should use

  • :connection_name (String) — default: nil

    Client-provided connection name, if any. Note that the value returned does not uniquely identify a connection and cannot be used as a connection identifier in HTTP API requests.

See Also:



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/bunny/session.rb', line 145

def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new)
  opts = case (connection_string_or_opts)
         when nil then
           Hash.new
         when String then
           self.class.parse_uri(connection_string_or_opts)
         when Hash then
           connection_string_or_opts
         end.merge(optz)

  @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle }

  @opts            = opts
  log_file         = opts[:log_file] || opts[:logfile] || STDOUT
  log_level        = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN
  # we might need to log a warning about ill-formatted IPv6 address but
  # progname includes hostname, so init like this first
  @logger          = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level))

  @addresses       = self.addresses_from(opts)
  @address_index   = 0

  @transport       = nil
  @user            = self.username_from(opts)
  @pass            = self.password_from(opts)
  @vhost           = self.vhost_from(opts)
  @threaded        = opts.fetch(:threaded, true)

  # re-init, see above
  @logger          = opts.fetch(:logger, init_default_logger(log_file, log_level))

  validate_connection_options(opts)
  @last_connection_error = nil

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] | opts[:automatic_recovery]
                           end
  @recovering_from_network_failure = false
  @max_recovery_attempts = opts[:recovery_attempts]
  @recovery_attempts     = @max_recovery_attempts
  # When this is set, connection attempts won't be reset after
  # successful reconnection. Some find this behavior more sensible
  # than the per-failure attempt counter. MK.
  @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true)

  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true)
  # in ms
  @continuation_timeout   = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)

  @status             = :not_connected
  @manually_closed    = false
  @blocked            = false

  # these are negotiated with the broker during the connection tuning phase
  @client_frame_max   = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
  @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX))
  # will be-renegotiated during connection tuning steps. MK.
  @channel_max        = @client_channel_max
  @heartbeat_sender   = nil
  @client_heartbeat   = self.heartbeat_from(opts)

  client_props         = opts[:properties] || opts[:client_properties] || {}
  @connection_name     = client_props[:connection_name] || opts[:connection_name]
  @client_properties   = DEFAULT_CLIENT_PROPERTIES.merge(client_props)
                                                  .merge(connection_name: connection_name)
  @mechanism           = normalize_auth_mechanism(opts.fetch(:auth_mechanism, "PLAIN"))
  @credentials_encoder = credentials_encoder_for(@mechanism)
  @locale              = @opts.fetch(:locale, DEFAULT_LOCALE)

  @mutex_impl          = @opts.fetch(:mutex_impl, Monitor)

  # mutex for the channel id => channel hash
  @channel_mutex       = @mutex_impl.new
  # transport operations/continuations mutex. A workaround for
  # the non-reentrant Ruby mutexes. MK.
  @transport_mutex     = @mutex_impl.new
  @status_mutex        = @mutex_impl.new
  @address_index_mutex = @mutex_impl.new

  @channels            = Hash.new

  trf = @opts.fetch(:topology_recovery_filter, DefaultTopologyRecoveryFilter.new)
  @topology_registry = TopologyRegistry.new(topology_recovery_filter: trf)

  @recovery_attempt_started = opts[:recovery_attempt_started]
  @recovery_completed       = opts[:recovery_completed]
  @recovery_attempts_exhausted = opts[:recovery_attempts_exhausted]

  @session_error_handler = opts.fetch(:session_error_handler, Thread.current)

  @recoverable_exceptions = DEFAULT_RECOVERABLE_EXCEPTIONS.dup

  self.reset_continuations
  self.initialize_transport

end