Class: AMQP::Session

Inherits:
EM::Connection
  • Object
show all
Extended by:
ProtocolMethodHandlers, RegisterEntityMixin
Includes:
Callbacks, ChannelIdAllocator, Openable
Defined in:
lib/amqp/session.rb

Overview

AMQP session represents connection to the broker. Session objects let you define callbacks for various TCP connection lifecycle events, for instance:

  • Connection is established
  • Connection has failed
  • Authentication has failed
  • Connection is lost (there is a network failure)
  • AMQP connection is opened
  • AMQP connection parameters (tuning) are negotiated and accepted by the broker
  • AMQP connection is properly closed

Key methods

Constant Summary collapse

Deferrable =

Backwards compatibility with 0.7.0.a25. MK.

EventMachine::DefaultDeferrable

Constants included from Openable

Openable::VALUES

Constants included from ChannelIdAllocator

ChannelIdAllocator::MAX_CHANNELS_PER_CONNECTION

Broker information collapse

Instance Attribute Summary collapse

Attributes included from Openable

#status

Connecting, reconnecting, disconnecting collapse

Broker information collapse

Error Handling and Recovery collapse

Blocked connection notifications collapse

Connection operations collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RegisterEntityMixin

register_entity

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Methods included from ChannelIdAllocator

#next_channel_id, #release_channel_id, #reset_channel_id_allocator

Constructor Details

#initialize(*args, &block) ⇒ Session

Returns a new instance of Session



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/amqp/session.rb', line 143

def initialize(*args, &block)
  super(*args)

  connection_options_or_string = args.first
  other_options                = args[1] || {}

  self.logger   = self.class.logger

  # channel => collected frames. MK.
  @frames            = Hash.new { Array.new }
  @channels          = Hash.new
  @callbacks         = Hash.new

  opening!

  # track TCP connection state, used to detect initial TCP connection failures.
  @tcp_connection_established       = false
  @tcp_connection_failed            = false
  @intentionally_closing_connection = false

  @settings                           = AMQ::Settings.configure(connection_options_or_string).merge(other_options)

  @on_tcp_connection_failure          = Proc.new { |settings|
    closed!
    if cb = @settings[:on_tcp_connection_failure]
      cb.call(settings)
    else
      raise self.class.tcp_connection_failure_exception_class.new(settings)
    end
  }

  @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
    raise self.class.authentication_failure_exception_class.new(settings)
  }
  @mechanism         = normalize_auth_mechanism(@settings.fetch(:auth_mechanism, "PLAIN"))
  @locale            = @settings.fetch(:locale, "en_GB")
  @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))

  @auto_recovery     = (!!@settings[:auto_recovery])

  @connection_timeout = (@settings[:timeout] || @settings[:connection_timeout] || 3).to_f

  self.reset
  self.set_pending_connect_timeout(@connection_timeout) unless defined?(JRUBY_VERSION)
end

Instance Attribute Details

#callbacksArray<#call> (readonly)

Returns:

  • (Array<#call>)


61
62
63
# File 'lib/amqp/session.rb', line 61

def callbacks
  @callbacks
end

#channel_maxObject

Maximum channel number that the server permits this connection to use. Usable channel numbers are in the range 1..channel_max. Zero indicates no specified limit.



94
95
96
# File 'lib/amqp/session.rb', line 94

def channel_max
  @channel_max
end

#channelsObject (readonly)

Channels within this connection.



87
88
89
# File 'lib/amqp/session.rb', line 87

def channels
  @channels
end

#client_propertiesObject

Client capabilities



72
73
74
# File 'lib/amqp/session.rb', line 72

def client_properties
  @client_properties
end

#connection_timeoutObject



101
102
103
# File 'lib/amqp/session.rb', line 101

def connection_timeout
  @connection_timeout
end

#frame_maxObject

Maximum frame size that the server permits this connection to use.



99
100
101
# File 'lib/amqp/session.rb', line 99

def frame_max
  @frame_max
end

#known_hostsObject (readonly)



103
104
105
# File 'lib/amqp/session.rb', line 103

def known_hosts
  @known_hosts
end

#localeObject

The locale defines the language in which the server will send reply texts.



67
68
69
# File 'lib/amqp/session.rb', line 67

def locale
  @locale
end

#loggerObject

API



57
58
59
# File 'lib/amqp/session.rb', line 57

def logger
  @logger
end

#mechanismObject (readonly)

Authentication mechanism used.



77
78
79
# File 'lib/amqp/session.rb', line 77

def mechanism
  @mechanism
end

#server_authentication_mechanismsObject (readonly)

Authentication mechanisms broker supports.



82
83
84
# File 'lib/amqp/session.rb', line 82

def server_authentication_mechanisms
  @server_authentication_mechanisms
end

#server_capabilitiesHash (readonly)

Server capabilities (usually used to detect AMQP 0.9.1 extensions like basic.nack, publisher confirms and so on)



263
264
265
# File 'lib/amqp/session.rb', line 263

def server_capabilities
  @server_capabilities
end

#server_localesObject (readonly)

Locales server supports



268
269
270
# File 'lib/amqp/session.rb', line 268

def server_locales
  @server_locales
end

#server_propertiesHash (readonly)

Server properties (product information, platform, et cetera)



256
257
258
# File 'lib/amqp/session.rb', line 256

def server_properties
  @server_properties
end

#settingsObject



58
59
60
# File 'lib/amqp/session.rb', line 58

def settings
  @settings
end

Class Method Details

.connect(connection_string_or_opts = ENV['RABBITMQ_URL'], other_options = {}, &block) ⇒ Object

Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection succeeds.

def self.connect(settings = {}, &block)

Parameters:

  • settings (Hash)


444
445
446
447
448
449
450
451
# File 'lib/amqp/session.rb', line 444

def self.connect(connection_string_or_opts = ENV['RABBITMQ_URL'], other_options = {}, &block)
  @settings = AMQ::Settings.configure(connection_string_or_opts).merge(other_options)

  instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
  instance.register_connection_callback(&block)

  instance
end

.loggerObject



112
113
114
115
116
117
# File 'lib/amqp/session.rb', line 112

def logger
  @logger ||= begin
                require "logger"
                Logger.new(STDERR)
              end
end

.logger=(logger) ⇒ Object



119
120
121
122
123
124
125
126
# File 'lib/amqp/session.rb', line 119

def logger=(logger)
  methods = AMQP::Logging::REQUIRED_METHODS
  unless methods.all? { |method| logger.respond_to?(method) }
    raise AMQP::Logging::IncompatibleLoggerError.new(methods)
  end

  @logger = logger
end

.loggingBoolean

Returns Current value of logging flag.

Returns:

  • (Boolean)

    Current value of logging flag.



129
130
131
# File 'lib/amqp/session.rb', line 129

def logging
  settings[:logging]
end

.logging=(boolean) ⇒ Object

Turns loggin on or off.



134
135
136
# File 'lib/amqp/session.rb', line 134

def logging=(boolean)
  settings[:logging] = boolean
end

.settingsObject

Settings



108
109
110
# File 'lib/amqp/session.rb', line 108

def settings
  @settings ||= AMQ::Settings.default
end

Instance Method Details

#auth_mechanism_adapterObject

Retrieves an AuthMechanismAdapter that will encode credentials for this Adapter.



902
903
904
# File 'lib/amqp/session.rb', line 902

def auth_mechanism_adapter
  @auth_mechanism_adapter ||= AuthMechanismAdapter.for_adapter(self)
end

#authenticating?Boolean

Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).

Returns:

  • (Boolean)


528
529
530
# File 'lib/amqp/session.rb', line 528

def authenticating?
  @authenticating
end

#auto_recoverObject

Performs recovery of channels that are in the automatic recovery mode.



381
382
383
# File 'lib/amqp/session.rb', line 381

def auto_recover
  @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover }
end

#auto_recovering?Boolean Also known as: auto_recovery?

Returns whether connection is in the automatic recovery mode

Returns:

  • (Boolean)

    whether connection is in the automatic recovery mode



369
370
371
# File 'lib/amqp/session.rb', line 369

def auto_recovering?
  !!@auto_recovery
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



352
353
354
# File 'lib/amqp/session.rb', line 352

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#brokerAMQP::Broker

Returns Broker this connection is established with

Returns:

  • (AMQP::Broker)

    Broker this connection is established with



271
272
273
# File 'lib/amqp/session.rb', line 271

def broker
  @broker ||= AMQP::Broker.new(@server_properties)
end

#broker_endpointString

Returns Broker endpoint in the form of HOST:PORT/VHOST

Returns:

  • (String)

    Broker endpoint in the form of HOST:PORT/VHOST



210
211
212
# File 'lib/amqp/session.rb', line 210

def broker_endpoint
  "#{self.hostname}:#{self.port}/#{self.vhost}"
end

#connected?Boolean

Returns true if this AMQP connection is currently open

Returns:

  • (Boolean)

    true if this AMQP connection is currently open



191
192
193
# File 'lib/amqp/session.rb', line 191

def connected?
  self.opened?
end

#connection_successfulObject

Called by AMQP::Connection after we receive connection.open-ok.



661
662
663
664
665
666
# File 'lib/amqp/session.rb', line 661

def connection_successful
  @authenticating = false
  opened!

  @connection_deferrable.succeed
end

#content_complete?(frames) ⇒ Boolean (protected)

Determines, whether given frame array contains full content body

Returns:

  • (Boolean)


1102
1103
1104
1105
1106
1107
# File 'lib/amqp/session.rb', line 1102

def content_complete?(frames)
  return false if frames.empty?
  header = frames[0]
  raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame)
  header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size }
end

#disconnect(reply_code = 200, reply_text = "Goodbye", &block) ⇒ Object Also known as: close

Properly close connection with AMQ broker, as described in section 2.2.4 of the AMQP 0.9.1 specification.

See Also:

  • #close_connection


227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/amqp/session.rb', line 227

def disconnect(reply_code = 200, reply_text = "Goodbye", &block)
  @intentionally_closing_connection = true
  self.on_disconnection do
    @frames.clear
    block.call if block
  end

  # ruby-amqp/amqp#66, MK.
  if self.open?
    closing!
    self.send_frame(AMQ::Protocol::Connection::Close.encode(reply_code, reply_text, 0, 0))
  elsif self.closing?
    # no-op
  else
    self.disconnection_successful
  end
end

#disconnection_successfulObject

Called by AMQP::Connection after we receive connection.close-ok.



672
673
674
675
676
677
678
679
# File 'lib/amqp/session.rb', line 672

def disconnection_successful
  @disconnection_deferrable.succeed

  # true for "after writing buffered data"
  self.close_connection(true)
  self.reset
  closed!
end

#encode_credentials(username, password) ⇒ Object

See Also:



894
895
896
# File 'lib/amqp/session.rb', line 894

def encode_credentials(username, password)
  auth_mechanism_adapter.encode_credentials(username, password)
end

#frameset_complete?(frames) ⇒ Boolean (protected)

Determines, whether the received frameset is ready to be further processed

Returns:

  • (Boolean)


1095
1096
1097
1098
1099
# File 'lib/amqp/session.rb', line 1095

def frameset_complete?(frames)
  return false if frames.empty?
  first_frame = frames[0]
  first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1]))
end

#get_next_frameObject (protected)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns next frame from buffer whenever possible



1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
# File 'lib/amqp/session.rb', line 1077

def get_next_frame
  return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
  # octet + short
  offset = 3 # 1 + 2
  # length
  payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
  # 4 bytes for long payload length, 1 byte final octet
  frame_length = offset + payload_length + 5
  if frame_length <= @chunk_buffer.size
    @chunk_buffer.slice!(0, frame_length)
  else
    nil
  end
end

#handle_close(conn_close) ⇒ Object

Handles connection.close. When broker detects a connection level exception, this method is called.



1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
# File 'lib/amqp/session.rb', line 1034

def handle_close(conn_close)
  closed!
  # getting connection.close during connection negotiation means authentication
  # has failed (RabbitMQ 3.2+):
  # http://www.rabbitmq.com/auth-notification.html
  if authenticating?
    @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure
  end
  self.exec_callback_yielding_self(:error, conn_close)
end

#handle_close_ok(close_ok) ⇒ Object

Handles Connection.Close-Ok.



1050
1051
1052
1053
# File 'lib/amqp/session.rb', line 1050

def handle_close_ok(close_ok)
  closed!
  self.disconnection_successful
end

#handle_connection_blocked(connection_blocked) ⇒ Object



1056
1057
1058
# File 'lib/amqp/session.rb', line 1056

def handle_connection_blocked(connection_blocked)
  @on_blocked.call(self, connection_blocked) if @on_blocked
end

#handle_connection_unblocked(connection_unblocked) ⇒ Object



1060
1061
1062
# File 'lib/amqp/session.rb', line 1060

def handle_connection_unblocked(connection_unblocked)
  @on_unblocked.call(self, connection_unblocked) if @on_unblocked
end

#handle_open_ok(open_ok) ⇒ Object

Handles Connection.Open-Ok.



1022
1023
1024
1025
1026
1027
# File 'lib/amqp/session.rb', line 1022

def handle_open_ok(open_ok)
  @known_hosts = open_ok.known_hosts.dup.freeze

  opened!
  self.connection_successful if self.respond_to?(:connection_successful)
end

#handle_skipped_heartbeatsObject

Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.



685
686
687
688
689
690
691
692
# File 'lib/amqp/session.rb', line 685

def handle_skipped_heartbeats
  if !@handling_skipped_heartbeats && @tcp_connection_established && !@intentionally_closing_connection
    @handling_skipped_heartbeats = true
    self.cancel_heartbeat_sender

    self.run_skipped_heartbeats_callbacks
  end
end

#handle_start(connection_start) ⇒ Object

Handles connection.start.



982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
# File 'lib/amqp/session.rb', line 982

def handle_start(connection_start)
  @server_properties                = connection_start.server_properties
  @server_capabilities              = @server_properties["capabilities"]

  @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
  @server_locales                   = Array(connection_start.locales)

  username = @settings[:user] || @settings[:username]
  password = @settings[:pass] || @settings[:password]

  # It's not clear whether we should transition to :opening state here
  # or in #open but in case authentication fails, it would be strange to have
  # @status undefined. So lets do this. MK.
  opening!

  self.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, mechanism, self.encode_credentials(username, password), @locale))
end

#handle_tune(connection_tune) ⇒ Object

Handles Connection.Tune-Ok.



1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
# File 'lib/amqp/session.rb', line 1005

def handle_tune(connection_tune)
  @channel_max        = connection_tune.channel_max.freeze
  @frame_max          = connection_tune.frame_max.freeze

  client_heartbeat    = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0

  @heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat)

  self.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
  self.initialize_heartbeat_sender if heartbeats_enabled?
end

#handshakeObject

Sends connection preamble to the broker.



871
872
873
874
# File 'lib/amqp/session.rb', line 871

def handshake
  @authenticating = true
  self.send_preamble
end

#heartbeat_intervalFixnum

Returns heartbeat interval this client uses, in seconds. This value may or may not be used depending on broker capabilities. Zero means the server does not want a heartbeat.

Returns:

  • (Fixnum)

    Heartbeat interval this client uses, in seconds.

See Also:



750
751
752
# File 'lib/amqp/session.rb', line 750

def heartbeat_interval
  @heartbeat_interval
end

#heartbeats_enabled?Boolean

Returns true if heartbeats are enabled (heartbeat interval is greater than 0)

Returns:

  • (Boolean)


756
757
758
# File 'lib/amqp/session.rb', line 756

def heartbeats_enabled?
  @heartbeat_interval && (@heartbeat_interval > 0)
end

#hostnameString Also known as: host

Returns Broker hostname this connection uses

Returns:

  • (String)

    Broker hostname this connection uses



197
198
199
# File 'lib/amqp/session.rb', line 197

def hostname
  @settings[:host]
end

#negotiate_heartbeat_value(client_value, server_value) ⇒ Object (protected)



1066
1067
1068
1069
1070
1071
1072
# File 'lib/amqp/session.rb', line 1066

def negotiate_heartbeat_value(client_value, server_value)
  if client_value == 0 || server_value == 0
    [client_value, server_value].max
  else
    [client_value, server_value].min
  end
end

#on_blocked(&fn) ⇒ Object



390
391
392
# File 'lib/amqp/session.rb', line 390

def on_blocked(&fn)
  @on_blocked = fn
end

#on_closed(&block) ⇒ Object Also known as: on_disconnection

Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok). You can define more than one callback.

See Also:



298
299
300
# File 'lib/amqp/session.rb', line 298

def on_closed(&block)
  @disconnection_deferrable.callback(&block)
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



331
332
333
# File 'lib/amqp/session.rb', line 331

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_error(&block) ⇒ Object

Defines a callback that will be executed when connection is closed after connection-level exception. Only one callback can be defined (the one defined last replaces previously added ones).



342
343
344
# File 'lib/amqp/session.rb', line 342

def on_error(&block)
  self.redefine_callback(:error, &block)
end

#on_open(&block) ⇒ Object Also known as: on_connection

Defines a callback that will be executed when AMQP connection is considered open: after client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds. You can define more than one callback.

See Also:



285
286
287
# File 'lib/amqp/session.rb', line 285

def on_open(&block)
  @connection_deferrable.callback(&block)
end

#on_possible_authentication_failure(&block) ⇒ Object

Defines a callback that will be run when TCP connection is closed before authentication finishes. Usually this means authentication failure. You can define only one callback.



323
324
325
# File 'lib/amqp/session.rb', line 323

def on_possible_authentication_failure(&block)
  @on_possible_authentication_failure = block
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed after AMQP connection has recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



361
362
363
# File 'lib/amqp/session.rb', line 361

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#on_skipped_heartbeats(&block) ⇒ Object

Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected). Only one callback can be defined (the one defined last replaces previously added ones).



851
852
853
# File 'lib/amqp/session.rb', line 851

def on_skipped_heartbeats(&block)
  self.redefine_callback(:skipped_heartbeats, &block)
end

#on_tcp_connection_failure(&block) ⇒ Object

Defines a callback that will be run when initial TCP connection fails. You can define only one callback.



307
308
309
# File 'lib/amqp/session.rb', line 307

def on_tcp_connection_failure(&block)
  @on_tcp_connection_failure = block
end

#on_tcp_connection_loss(&block) ⇒ Object

Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). You can define only one callback.



315
316
317
# File 'lib/amqp/session.rb', line 315

def on_tcp_connection_loss(&block)
  @on_tcp_connection_loss = block
end

#on_unblocked(&fn) ⇒ Object



394
395
396
# File 'lib/amqp/session.rb', line 394

def on_unblocked(&fn)
  @on_unblocked = fn
end

#open(vhost = "/") ⇒ Object

Sends connection.open to the server.



881
882
883
# File 'lib/amqp/session.rb', line 881

def open(vhost = "/")
  self.send_frame(AMQ::Protocol::Connection::Open.encode(vhost))
end

#periodically_reconnect(period = 5) ⇒ Object

Periodically try to reconnect.

Parameters:

  • period (Fixnum) (defaults to: 5)

    Period of time, in seconds, to wait before reconnection attempt.

  • force (Boolean)

    If true, enforces immediate reconnection.



495
496
497
498
499
500
501
502
# File 'lib/amqp/session.rb', line 495

def periodically_reconnect(period = 5)
  @reconnecting = true
  self.reset

  @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) {
    EventMachine.reconnect(@settings[:host], @settings[:port], self)
  }
end

#portString

Returns Broker port this connection uses

Returns:

  • (String)

    Broker port this connection uses



204
205
206
# File 'lib/amqp/session.rb', line 204

def port
  @settings[:port]
end

#receive_frame(frame) ⇒ Object

Processes a single frame.

Parameters:

  • frame (AMQ::Protocol::Frame)


911
912
913
914
915
916
917
918
919
920
921
922
923
# File 'lib/amqp/session.rb', line 911

def receive_frame(frame)
  @frames[frame.channel] ||= Array.new
  @frames[frame.channel] << frame

  if frameset_complete?(@frames[frame.channel])
    begin
      receive_frameset(@frames[frame.channel])
    ensure # Ensure that frames always will be cleared
      # for channel.close, frame.channel will be nil. MK.
      clear_frames_on(frame.channel) if @frames[frame.channel]
    end
  end
end

#receive_frameset(frames) ⇒ Object

Processes a frameset by finding and invoking a suitable handler. Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat value.

Parameters:

  • frames (Array<AMQ::Protocol::Frame>)


931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
# File 'lib/amqp/session.rb', line 931

def receive_frameset(frames)
  if self.heartbeats_enabled?
    # treat incoming traffic as heartbeats.
    # this operation is pretty expensive under heavy traffic but heartbeats can be disabled
    # (and are also disabled by default). MK.
    @last_server_heartbeat = Time.now
  end
  frame = frames.first

  if AMQ::Protocol::HeartbeatFrame === frame
    # no-op
  else
    if callable = AMQP::HandlersRegistry.find(frame.method_class)
      f = frames.shift
      callable.call(self, f, frames)
    else
      raise MissingHandlerError.new(frames.first)
    end
  end
end

#reconnect(force = false, period = 5) ⇒ Object

Reconnect after a period of wait.

Parameters:

  • period (Fixnum) (defaults to: 5)

    Period of time, in seconds, to wait before reconnection attempt.

  • force (Boolean) (defaults to: false)

    If true, enforces immediate reconnection.



458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/amqp/session.rb', line 458

def reconnect(force = false, period = 5)
  if @reconnecting and not force
    EventMachine::Timer.new(period) {
      reconnect(true, period)
    }
    return
  end

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end

#reconnect_to(connection_string_or_options, period = 5) ⇒ Object

Similar to #reconnect, but uses different connection settings

See Also:



477
478
479
480
481
482
483
484
485
486
487
# File 'lib/amqp/session.rb', line 477

def reconnect_to(connection_string_or_options, period = 5)
  settings = AMQ::Settings.configure(connection_string_or_opts)

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  @settings = Settings.configure(settings)
  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end

#reconnecting?Boolean

Returns:

  • (Boolean)


792
793
794
# File 'lib/amqp/session.rb', line 792

def reconnecting?
  @reconnecting
end

#resetObject (protected)



1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
# File 'lib/amqp/session.rb', line 1146

def reset
  @size      = 0
  @payload   = ""
  @frames    = Array.new

  @chunk_buffer                 = ""
  @connection_deferrable        = EventMachine::DefaultDeferrable.new
  @disconnection_deferrable     = EventMachine::DefaultDeferrable.new

  # used to track down whether authentication succeeded. AMQP 0.9.1 dictates
  # that on authentication failure broker must close TCP connection without sending
  # any more data. This is why we need to explicitly track whether we are past
  # authentication stage to signal possible authentication failures.
  @authenticating           = false
end

#reset_state!Object

Resets connection state.



888
889
890
# File 'lib/amqp/session.rb', line 888

def reset_state!
  # no-op by default
end

#send_frame(frame) ⇒ Object

Sends frame to the peer, checking that connection is open.



719
720
721
722
723
724
725
# File 'lib/amqp/session.rb', line 719

def send_frame(frame)
  if closed?
    raise ConnectionClosedError.new(frame)
  else
    self.send_raw(frame.encode)
  end
end

#send_frameset(frames, channel) ⇒ Object

Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.



731
732
733
734
735
736
737
738
739
740
# File 'lib/amqp/session.rb', line 731

def send_frameset(frames, channel)
  # some (many) developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    frames.each { |frame| self.send_frame(frame) }
  end
end

#send_heartbeatObject

Sends a heartbeat frame if connection is open.



963
964
965
966
967
968
969
970
971
# File 'lib/amqp/session.rb', line 963

def send_heartbeat
  if tcp_connection_established? && !@handling_skipped_heartbeats && @last_server_heartbeat
    if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting?
      logger.error "[amqp] Detected missing server heartbeats"
      self.handle_skipped_heartbeats
    end
    send_frame(AMQ::Protocol::HeartbeatFrame)
  end
end

#send_preambleObject

Note:

This must be implemented by all AMQP clients.

Sends AMQ protocol header (also known as preamble).



712
713
714
# File 'lib/amqp/session.rb', line 712

def send_preamble
  self.send_raw(AMQ::Protocol::PREAMBLE)
end

#start_automatic_recoveryObject

Performs recovery of channels that are in the automatic recovery mode. “before recovery” callbacks are run immediately, “after recovery” callbacks are run after AMQP connection is re-established and auto recovery is performed (using #auto_recover).

Use this method if you want to run automatic recovery process after handling a connection-level exception, for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).



834
835
836
837
838
839
840
841
842
843
# File 'lib/amqp/session.rb', line 834

def start_automatic_recovery
  self.run_before_recovery_callbacks
  self.register_connection_callback do
    # always run automatic recovery, because it is per-channel
    # and connection has to start it. Channels that did not opt-in for
    # autorecovery won't be selected. MK.
    self.auto_recover
    self.run_after_recovery_callbacks
  end
end

#tcp_connection_established?Boolean

IS TCP connection estabilished and currently active?

Returns:

  • (Boolean)


535
536
537
# File 'lib/amqp/session.rb', line 535

def tcp_connection_established?
  @tcp_connection_established
end

#tcp_connection_failedObject

Called when initial TCP connection fails.



776
777
778
779
780
# File 'lib/amqp/session.rb', line 776

def tcp_connection_failed
  @recovered = false

  @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
end

#tcp_connection_lostObject

Called when previously established TCP connection fails.



784
785
786
787
788
789
# File 'lib/amqp/session.rb', line 784

def tcp_connection_lost
  @recovered = false

  @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
  self.handle_connection_interruption
end

#upgrade_to_tls_if_necessaryObject (protected)



1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/amqp/session.rb', line 1162

def upgrade_to_tls_if_necessary
  tls_options = @settings[:ssl]

  if tls_options.is_a?(Hash)
    start_tls(tls_options)
  elsif tls_options
    start_tls
  end
end

#usernameString Also known as: user

Returns Username used by this connection

Returns:

  • (String)

    Username used by this connection



216
217
218
# File 'lib/amqp/session.rb', line 216

def username
  @settings[:user]
end

#vhostString

vhost this connection uses. Default is “/”, a historically estabilished convention of RabbitMQ and amqp gem.

Returns:

  • (String)

    vhost this connection uses



766
767
768
# File 'lib/amqp/session.rb', line 766

def vhost
  @settings.fetch(:vhost, "/")
end