Class: AMQP::Session

Inherits:
EM::Connection
  • Object
show all
Extended by:
ProtocolMethodHandlers, RegisterEntityMixin
Includes:
Callbacks, ChannelIdAllocator, Openable
Defined in:
lib/amqp/session.rb

Overview

AMQP session represents connection to the broker. Session objects let you define callbacks for various TCP connection lifecycle events, for instance:

  • Connection is established
  • Connection has failed
  • Authentication has failed
  • Connection is lost (there is a network failure)
  • AMQP connection is opened
  • AMQP connection parameters (tuning) are negotiated and accepted by the broker
  • AMQP connection is properly closed

Key methods

Constant Summary

Deferrable =

Backwards compatibility with 0.7.0.a25. MK.

EventMachine::DefaultDeferrable

Constants included from Openable

Openable::VALUES

Constants included from ChannelIdAllocator

ChannelIdAllocator::MAX_CHANNELS_PER_CONNECTION

Broker information collapse

Instance Attribute Summary collapse

Attributes included from Openable

#status

Connecting, reconnecting, disconnecting collapse

Broker information collapse

Error Handling and Recovery collapse

Blocked connection notifications collapse

Connection operations collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RegisterEntityMixin

register_entity

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Methods included from ChannelIdAllocator

#next_channel_id, #release_channel_id, #reset_channel_id_allocator

Constructor Details

#initialize(*args, &block) ⇒ Session

Returns a new instance of Session



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/amqp/session.rb', line 141

def initialize(*args, &block)
  super(*args)

  self.logger   = self.class.logger

  # channel => collected frames. MK.
  @frames            = Hash.new { Array.new }
  @channels          = Hash.new
  @callbacks         = Hash.new

  opening!

  # track TCP connection state, used to detect initial TCP connection failures.
  @tcp_connection_established       = false
  @tcp_connection_failed            = false
  @intentionally_closing_connection = false

  # EventMachine::Connection's and Adapter's constructors arity
  # make it easier to use *args. MK.
  @settings                           = Settings.configure(args.first)

  @on_tcp_connection_failure          = Proc.new { |settings|
    closed!
    if cb = @settings[:on_tcp_connection_failure]
      cb.call(settings)
    else
      raise self.class.tcp_connection_failure_exception_class.new(settings)
    end
  }

  @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
    raise self.class.authentication_failure_exception_class.new(settings)
  }

  @mechanism         = @settings.fetch(:auth_mechanism, "PLAIN")
  @locale            = @settings.fetch(:locale, "en_GB")
  @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))

  @auto_recovery     = (!!@settings[:auto_recovery])

  self.reset
  self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)
end

Instance Attribute Details

#callbacksArray<#call> (readonly)

Returns:

  • (Array<#call>)


60
61
62
# File 'lib/amqp/session.rb', line 60

def callbacks
  @callbacks
end

#channel_maxObject

Maximum channel number that the server permits this connection to use. Usable channel numbers are in the range 1..channel_max. Zero indicates no specified limit.



93
94
95
# File 'lib/amqp/session.rb', line 93

def channel_max
  @channel_max
end

#channelsObject (readonly)

Channels within this connection.



86
87
88
# File 'lib/amqp/session.rb', line 86

def channels
  @channels
end

#client_propertiesObject

Client capabilities



71
72
73
# File 'lib/amqp/session.rb', line 71

def client_properties
  @client_properties
end

#frame_maxObject

Maximum frame size that the server permits this connection to use.



98
99
100
# File 'lib/amqp/session.rb', line 98

def frame_max
  @frame_max
end

#known_hostsObject (readonly)



101
102
103
# File 'lib/amqp/session.rb', line 101

def known_hosts
  @known_hosts
end

#localeObject

The locale defines the language in which the server will send reply texts.



66
67
68
# File 'lib/amqp/session.rb', line 66

def locale
  @locale
end

#loggerObject

API



56
57
58
# File 'lib/amqp/session.rb', line 56

def logger
  @logger
end

#mechanismObject (readonly)

Authentication mechanism used.



76
77
78
# File 'lib/amqp/session.rb', line 76

def mechanism
  @mechanism
end

#server_authentication_mechanismsObject (readonly)

Authentication mechanisms broker supports.



81
82
83
# File 'lib/amqp/session.rb', line 81

def server_authentication_mechanisms
  @server_authentication_mechanisms
end

#server_capabilitiesHash (readonly)

Server capabilities (usually used to detect AMQP 0.9.1 extensions like basic.nack, publisher confirms and so on)



259
260
261
# File 'lib/amqp/session.rb', line 259

def server_capabilities
  @server_capabilities
end

#server_localesObject (readonly)

Locales server supports



264
265
266
# File 'lib/amqp/session.rb', line 264

def server_locales
  @server_locales
end

#server_propertiesHash (readonly)

Server properties (product information, platform, et cetera)



252
253
254
# File 'lib/amqp/session.rb', line 252

def server_properties
  @server_properties
end

#settingsObject



57
58
59
# File 'lib/amqp/session.rb', line 57

def settings
  @settings
end

Class Method Details

.connect(settings = {}, &block) ⇒ Object

Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection succeeds.

Parameters:

  • settings (Hash) (defaults to: {})

Options Hash (settings):

  • :host (String) — default: "127.0.0.1"

    Hostname AMQ broker runs on.

  • :port (String) — default: 5672

    Port AMQ broker listens on.

  • :vhost (String) — default: "/"

    Virtual host to use.

  • :user (String) — default: "guest"

    Username to use for authentication.

  • :pass (String) — default: "guest"

    Password to use for authentication.

  • :auth_mechanism (String) — default: "PLAIN"

    SASL authentication mechanism to use.

  • :ssl (String) — default: false

    Should be use TLS (SSL) for connection?

  • :timeout (String) — default: nil

    Connection timeout.

  • :heartbeat (Fixnum) — default: 0

    Connection heartbeat, in seconds.

  • :frame_max (Fixnum) — default: 131072

    Maximum frame size to use. If broker cannot support frames this large, broker’s maximum value will be used instead.



439
440
441
442
443
444
445
446
# File 'lib/amqp/session.rb', line 439

def self.connect(settings = {}, &block)
  @settings = Settings.configure(settings)

  instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
  instance.register_connection_callback(&block)

  instance
end

.loggerObject



110
111
112
113
114
115
# File 'lib/amqp/session.rb', line 110

def logger
  @logger ||= begin
                require "logger"
                Logger.new(STDERR)
              end
end

.logger=(logger) ⇒ Object



117
118
119
120
121
122
123
124
# File 'lib/amqp/session.rb', line 117

def logger=(logger)
  methods = AMQP::Logging::REQUIRED_METHODS
  unless methods.all? { |method| logger.respond_to?(method) }
    raise AMQP::Logging::IncompatibleLoggerError.new(methods)
  end

  @logger = logger
end

.loggingBoolean

Returns Current value of logging flag.

Returns:

  • (Boolean)

    Current value of logging flag.



127
128
129
# File 'lib/amqp/session.rb', line 127

def logging
  settings[:logging]
end

.logging=(boolean) ⇒ Object

Turns loggin on or off.



132
133
134
# File 'lib/amqp/session.rb', line 132

def logging=(boolean)
  settings[:logging] = boolean
end

.settingsObject

Settings



106
107
108
# File 'lib/amqp/session.rb', line 106

def settings
  @settings ||= AMQP::Settings.default
end

Instance Method Details

#auth_mechanism_adapterObject

Retrieves an AuthMechanismAdapter that will encode credentials for this Adapter.



904
905
906
# File 'lib/amqp/session.rb', line 904

def auth_mechanism_adapter
  @auth_mechanism_adapter ||= AuthMechanismAdapter.for_adapter(self)
end

#authenticating?Boolean

Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).

Returns:

  • (Boolean)


530
531
532
# File 'lib/amqp/session.rb', line 530

def authenticating?
  @authenticating
end

#auto_recoverObject

Performs recovery of channels that are in the automatic recovery mode.



377
378
379
# File 'lib/amqp/session.rb', line 377

def auto_recover
  @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover }
end

#auto_recovering?Boolean Also known as: auto_recovery?

Returns whether connection is in the automatic recovery mode

Returns:

  • (Boolean)

    whether connection is in the automatic recovery mode



365
366
367
# File 'lib/amqp/session.rb', line 365

def auto_recovering?
  !!@auto_recovery
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



348
349
350
# File 'lib/amqp/session.rb', line 348

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#brokerAMQP::Broker

Returns Broker this connection is established with

Returns:

  • (AMQP::Broker)

    Broker this connection is established with



267
268
269
# File 'lib/amqp/session.rb', line 267

def broker
  @broker ||= AMQP::Broker.new(@server_properties)
end

#broker_endpointString

Returns Broker endpoint in the form of HOST:PORT/VHOST

Returns:

  • (String)

    Broker endpoint in the form of HOST:PORT/VHOST



206
207
208
# File 'lib/amqp/session.rb', line 206

def broker_endpoint
  "#{self.hostname}:#{self.port}/#{self.vhost}"
end

#connected?Boolean

Returns true if this AMQP connection is currently open

Returns:

  • (Boolean)

    true if this AMQP connection is currently open



187
188
189
# File 'lib/amqp/session.rb', line 187

def connected?
  self.opened?
end

#connection_successfulObject

Called by AMQP::Connection after we receive connection.open-ok.



663
664
665
666
667
668
# File 'lib/amqp/session.rb', line 663

def connection_successful
  @authenticating = false
  opened!

  @connection_deferrable.succeed
end

#content_complete?(frames) ⇒ Boolean (protected)

Determines, whether given frame array contains full content body

Returns:

  • (Boolean)


1104
1105
1106
1107
1108
1109
# File 'lib/amqp/session.rb', line 1104

def content_complete?(frames)
  return false if frames.empty?
  header = frames[0]
  raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame)
  header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size }
end

#disconnect(reply_code = 200, reply_text = "Goodbye", &block) ⇒ Object Also known as: close

Properly close connection with AMQ broker, as described in section 2.2.4 of the AMQP 0.9.1 specification.

See Also:

  • #close_connection


223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/amqp/session.rb', line 223

def disconnect(reply_code = 200, reply_text = "Goodbye", &block)
  @intentionally_closing_connection = true
  self.on_disconnection do
    @frames.clear
    block.call if block
  end

  # ruby-amqp/amqp#66, MK.
  if self.open?
    closing!
    self.send_frame(AMQ::Protocol::Connection::Close.encode(reply_code, reply_text, 0, 0))
  elsif self.closing?
    # no-op
  else
    self.disconnection_successful
  end
end

#disconnection_successfulObject

Called by AMQP::Connection after we receive connection.close-ok.



674
675
676
677
678
679
680
681
# File 'lib/amqp/session.rb', line 674

def disconnection_successful
  @disconnection_deferrable.succeed

  # true for "after writing buffered data"
  self.close_connection(true)
  self.reset
  closed!
end

#encode_credentials(username, password) ⇒ Object

See Also:



896
897
898
# File 'lib/amqp/session.rb', line 896

def encode_credentials(username, password)
  auth_mechanism_adapter.encode_credentials(username, password)
end

#frameset_complete?(frames) ⇒ Boolean (protected)

Determines, whether the received frameset is ready to be further processed

Returns:

  • (Boolean)


1097
1098
1099
1100
1101
# File 'lib/amqp/session.rb', line 1097

def frameset_complete?(frames)
  return false if frames.empty?
  first_frame = frames[0]
  first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1]))
end

#get_next_frameObject (protected)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns next frame from buffer whenever possible



1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
# File 'lib/amqp/session.rb', line 1079

def get_next_frame
  return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
  # octet + short
  offset = 3 # 1 + 2
  # length
  payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
  # 4 bytes for long payload length, 1 byte final octet
  frame_length = offset + payload_length + 5
  if frame_length <= @chunk_buffer.size
    @chunk_buffer.slice!(0, frame_length)
  else
    nil
  end
end

#handle_close(conn_close) ⇒ Object

Handles connection.close. When broker detects a connection level exception, this method is called.



1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
# File 'lib/amqp/session.rb', line 1036

def handle_close(conn_close)
  closed!
  # getting connection.close during connection negotiation means authentication
  # has failed (RabbitMQ 3.2+):
  # http://www.rabbitmq.com/auth-notification.html
  if authenticating?
    @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure
  end
  self.exec_callback_yielding_self(:error, conn_close)
end

#handle_close_ok(close_ok) ⇒ Object

Handles Connection.Close-Ok.



1052
1053
1054
1055
# File 'lib/amqp/session.rb', line 1052

def handle_close_ok(close_ok)
  closed!
  self.disconnection_successful
end

#handle_connection_blocked(connection_blocked) ⇒ Object



1058
1059
1060
# File 'lib/amqp/session.rb', line 1058

def handle_connection_blocked(connection_blocked)
  @on_blocked.call(self, connection_blocked) if @on_blocked
end

#handle_connection_unblocked(connection_unblocked) ⇒ Object



1062
1063
1064
# File 'lib/amqp/session.rb', line 1062

def handle_connection_unblocked(connection_unblocked)
  @on_unblocked.call(self, connection_unblocked) if @on_unblocked
end

#handle_open_ok(open_ok) ⇒ Object

Handles Connection.Open-Ok.



1024
1025
1026
1027
1028
1029
# File 'lib/amqp/session.rb', line 1024

def handle_open_ok(open_ok)
  @known_hosts = open_ok.known_hosts.dup.freeze

  opened!
  self.connection_successful if self.respond_to?(:connection_successful)
end

#handle_skipped_heartbeatsObject

Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.



687
688
689
690
691
692
693
694
# File 'lib/amqp/session.rb', line 687

def handle_skipped_heartbeats
  if !@handling_skipped_heartbeats && @tcp_connection_established && !@intentionally_closing_connection
    @handling_skipped_heartbeats = true
    self.cancel_heartbeat_sender

    self.run_skipped_heartbeats_callbacks
  end
end

#handle_start(connection_start) ⇒ Object

Handles connection.start.



984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# File 'lib/amqp/session.rb', line 984

def handle_start(connection_start)
  @server_properties                = connection_start.server_properties
  @server_capabilities              = @server_properties["capabilities"]

  @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
  @server_locales                   = Array(connection_start.locales)

  username = @settings[:user] || @settings[:username]
  password = @settings[:pass] || @settings[:password]

  # It's not clear whether we should transition to :opening state here
  # or in #open but in case authentication fails, it would be strange to have
  # @status undefined. So lets do this. MK.
  opening!

  self.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, mechanism, self.encode_credentials(username, password), @locale))
end

#handle_tune(connection_tune) ⇒ Object

Handles Connection.Tune-Ok.



1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
# File 'lib/amqp/session.rb', line 1007

def handle_tune(connection_tune)
  @channel_max        = connection_tune.channel_max.freeze
  @frame_max          = connection_tune.frame_max.freeze

  client_heartbeat    = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0

  @heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat)

  self.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
  self.initialize_heartbeat_sender if heartbeats_enabled?
end

#handshakeObject

Sends connection preamble to the broker.



873
874
875
876
# File 'lib/amqp/session.rb', line 873

def handshake
  @authenticating = true
  self.send_preamble
end

#heartbeat_intervalFixnum

Returns heartbeat interval this client uses, in seconds. This value may or may not be used depending on broker capabilities. Zero means the server does not want a heartbeat.

Returns:

  • (Fixnum)

    Heartbeat interval this client uses, in seconds.

See Also:



752
753
754
# File 'lib/amqp/session.rb', line 752

def heartbeat_interval
  @heartbeat_interval
end

#heartbeats_enabled?Boolean

Returns true if heartbeats are enabled (heartbeat interval is greater than 0)

Returns:

  • (Boolean)


758
759
760
# File 'lib/amqp/session.rb', line 758

def heartbeats_enabled?
  @heartbeat_interval && (@heartbeat_interval > 0)
end

#hostnameString Also known as: host

Returns Broker hostname this connection uses

Returns:

  • (String)

    Broker hostname this connection uses



193
194
195
# File 'lib/amqp/session.rb', line 193

def hostname
  @settings[:host]
end

#negotiate_heartbeat_value(client_value, server_value) ⇒ Object (protected)



1068
1069
1070
1071
1072
1073
1074
# File 'lib/amqp/session.rb', line 1068

def negotiate_heartbeat_value(client_value, server_value)
  if client_value == 0 || server_value == 0
    [client_value, server_value].max
  else
    [client_value, server_value].min
  end
end

#on_blocked(&fn) ⇒ Object



386
387
388
# File 'lib/amqp/session.rb', line 386

def on_blocked(&fn)
  @on_blocked = fn
end

#on_closed(&block) ⇒ Object Also known as: on_disconnection

Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok). You can define more than one callback.

See Also:



294
295
296
# File 'lib/amqp/session.rb', line 294

def on_closed(&block)
  @disconnection_deferrable.callback(&block)
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



327
328
329
# File 'lib/amqp/session.rb', line 327

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_error(&block) ⇒ Object

Defines a callback that will be executed when connection is closed after connection-level exception. Only one callback can be defined (the one defined last replaces previously added ones).



338
339
340
# File 'lib/amqp/session.rb', line 338

def on_error(&block)
  self.redefine_callback(:error, &block)
end

#on_open(&block) ⇒ Object Also known as: on_connection

Defines a callback that will be executed when AMQP connection is considered open: after client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds. You can define more than one callback.

See Also:



281
282
283
# File 'lib/amqp/session.rb', line 281

def on_open(&block)
  @connection_deferrable.callback(&block)
end

#on_possible_authentication_failure(&block) ⇒ Object

Defines a callback that will be run when TCP connection is closed before authentication finishes. Usually this means authentication failure. You can define only one callback.



319
320
321
# File 'lib/amqp/session.rb', line 319

def on_possible_authentication_failure(&block)
  @on_possible_authentication_failure = block
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed after AMQP connection has recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



357
358
359
# File 'lib/amqp/session.rb', line 357

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#on_skipped_heartbeats(&block) ⇒ Object

Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected). Only one callback can be defined (the one defined last replaces previously added ones).



853
854
855
# File 'lib/amqp/session.rb', line 853

def on_skipped_heartbeats(&block)
  self.redefine_callback(:skipped_heartbeats, &block)
end

#on_tcp_connection_failure(&block) ⇒ Object

Defines a callback that will be run when initial TCP connection fails. You can define only one callback.



303
304
305
# File 'lib/amqp/session.rb', line 303

def on_tcp_connection_failure(&block)
  @on_tcp_connection_failure = block
end

#on_tcp_connection_loss(&block) ⇒ Object

Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). You can define only one callback.



311
312
313
# File 'lib/amqp/session.rb', line 311

def on_tcp_connection_loss(&block)
  @on_tcp_connection_loss = block
end

#on_unblocked(&fn) ⇒ Object



390
391
392
# File 'lib/amqp/session.rb', line 390

def on_unblocked(&fn)
  @on_unblocked = fn
end

#open(vhost = "/") ⇒ Object

Sends connection.open to the server.



883
884
885
# File 'lib/amqp/session.rb', line 883

def open(vhost = "/")
  self.send_frame(AMQ::Protocol::Connection::Open.encode(vhost))
end

#periodically_reconnect(period = 5) ⇒ Object

Periodically try to reconnect.

Parameters:

  • period (Fixnum) (defaults to: 5)

    Period of time, in seconds, to wait before reconnection attempt.

  • force (Boolean)

    If true, enforces immediate reconnection.



497
498
499
500
501
502
503
504
# File 'lib/amqp/session.rb', line 497

def periodically_reconnect(period = 5)
  @reconnecting = true
  self.reset

  @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) {
    EventMachine.reconnect(@settings[:host], @settings[:port], self)
  }
end

#portString

Returns Broker port this connection uses

Returns:

  • (String)

    Broker port this connection uses



200
201
202
# File 'lib/amqp/session.rb', line 200

def port
  @settings[:port]
end

#receive_frame(frame) ⇒ Object

Processes a single frame.

Parameters:

  • frame (AMQ::Protocol::Frame)


913
914
915
916
917
918
919
920
921
922
923
924
925
# File 'lib/amqp/session.rb', line 913

def receive_frame(frame)
  @frames[frame.channel] ||= Array.new
  @frames[frame.channel] << frame

  if frameset_complete?(@frames[frame.channel])
    begin
      receive_frameset(@frames[frame.channel])
    ensure # Ensure that frames always will be cleared
      # for channel.close, frame.channel will be nil. MK.
      clear_frames_on(frame.channel) if @frames[frame.channel]
    end
  end
end

#receive_frameset(frames) ⇒ Object

Processes a frameset by finding and invoking a suitable handler. Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat value.

Parameters:

  • frames (Array<AMQ::Protocol::Frame>)


933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
# File 'lib/amqp/session.rb', line 933

def receive_frameset(frames)
  if self.heartbeats_enabled?
    # treat incoming traffic as heartbeats.
    # this operation is pretty expensive under heavy traffic but heartbeats can be disabled
    # (and are also disabled by default). MK.
    @last_server_heartbeat = Time.now
  end
  frame = frames.first

  if AMQ::Protocol::HeartbeatFrame === frame
    # no-op
  else
    if callable = AMQP::HandlersRegistry.find(frame.method_class)
      f = frames.shift
      callable.call(self, f, frames)
    else
      raise MissingHandlerError.new(frames.first)
    end
  end
end

#reconnect(force = false, period = 5) ⇒ Object

Reconnect after a period of wait.

Parameters:

  • period (Fixnum) (defaults to: 5)

    Period of time, in seconds, to wait before reconnection attempt.

  • force (Boolean) (defaults to: false)

    If true, enforces immediate reconnection.



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/amqp/session.rb', line 453

def reconnect(force = false, period = 5)
  if @reconnecting and not force
    EventMachine::Timer.new(period) {
      reconnect(true, period)
    }
    return
  end

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end

#reconnect_to(connection_string_or_options, period = 5) ⇒ Object

Similar to #reconnect, but uses different connection settings

See Also:



472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
# File 'lib/amqp/session.rb', line 472

def reconnect_to(connection_string_or_options, period = 5)
  settings = case connection_string_or_options
             when String then
               AMQP.parse_connection_uri(connection_string_or_options)
             when Hash then
               connection_string_or_options
             else
               Hash.new
             end

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  @settings = Settings.configure(settings)
  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end

#reconnecting?Boolean

Returns:

  • (Boolean)


794
795
796
# File 'lib/amqp/session.rb', line 794

def reconnecting?
  @reconnecting
end

#resetObject (protected)



1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
# File 'lib/amqp/session.rb', line 1148

def reset
  @size      = 0
  @payload   = ""
  @frames    = Array.new

  @chunk_buffer                 = ""
  @connection_deferrable        = EventMachine::DefaultDeferrable.new
  @disconnection_deferrable     = EventMachine::DefaultDeferrable.new

  # used to track down whether authentication succeeded. AMQP 0.9.1 dictates
  # that on authentication failure broker must close TCP connection without sending
  # any more data. This is why we need to explicitly track whether we are past
  # authentication stage to signal possible authentication failures.
  @authenticating           = false
end

#reset_state!Object

Resets connection state.



890
891
892
# File 'lib/amqp/session.rb', line 890

def reset_state!
  # no-op by default
end

#send_frame(frame) ⇒ Object

Sends frame to the peer, checking that connection is open.



721
722
723
724
725
726
727
# File 'lib/amqp/session.rb', line 721

def send_frame(frame)
  if closed?
    raise ConnectionClosedError.new(frame)
  else
    self.send_raw(frame.encode)
  end
end

#send_frameset(frames, channel) ⇒ Object

Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.



733
734
735
736
737
738
739
740
741
742
# File 'lib/amqp/session.rb', line 733

def send_frameset(frames, channel)
  # some (many) developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    frames.each { |frame| self.send_frame(frame) }
  end
end

#send_heartbeatObject

Sends a heartbeat frame if connection is open.



965
966
967
968
969
970
971
972
973
# File 'lib/amqp/session.rb', line 965

def send_heartbeat
  if tcp_connection_established? && !@handling_skipped_heartbeats && @last_server_heartbeat
    if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting?
      logger.error "[amqp] Detected missing server heartbeats"
      self.handle_skipped_heartbeats
    end
    send_frame(AMQ::Protocol::HeartbeatFrame)
  end
end

#send_preambleObject

Note:

This must be implemented by all AMQP clients.

Sends AMQ protocol header (also known as preamble).



714
715
716
# File 'lib/amqp/session.rb', line 714

def send_preamble
  self.send_raw(AMQ::Protocol::PREAMBLE)
end

#start_automatic_recoveryObject

Performs recovery of channels that are in the automatic recovery mode. “before recovery” callbacks are run immediately, “after recovery” callbacks are run after AMQP connection is re-established and auto recovery is performed (using #auto_recover).

Use this method if you want to run automatic recovery process after handling a connection-level exception, for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).



836
837
838
839
840
841
842
843
844
845
# File 'lib/amqp/session.rb', line 836

def start_automatic_recovery
  self.run_before_recovery_callbacks
  self.register_connection_callback do
    # always run automatic recovery, because it is per-channel
    # and connection has to start it. Channels that did not opt-in for
    # autorecovery won't be selected. MK.
    self.auto_recover
    self.run_after_recovery_callbacks
  end
end

#tcp_connection_established?Boolean

IS TCP connection estabilished and currently active?

Returns:

  • (Boolean)


537
538
539
# File 'lib/amqp/session.rb', line 537

def tcp_connection_established?
  @tcp_connection_established
end

#tcp_connection_failedObject

Called when initial TCP connection fails.



778
779
780
781
782
# File 'lib/amqp/session.rb', line 778

def tcp_connection_failed
  @recovered = false

  @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
end

#tcp_connection_lostObject

Called when previously established TCP connection fails.



786
787
788
789
790
791
# File 'lib/amqp/session.rb', line 786

def tcp_connection_lost
  @recovered = false

  @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
  self.handle_connection_interruption
end

#upgrade_to_tls_if_necessaryObject (protected)



1164
1165
1166
1167
1168
1169
1170
1171
1172
# File 'lib/amqp/session.rb', line 1164

def upgrade_to_tls_if_necessary
  tls_options = @settings[:ssl]

  if tls_options.is_a?(Hash)
    start_tls(tls_options)
  elsif tls_options
    start_tls
  end
end

#usernameString Also known as: user

Returns Username used by this connection

Returns:

  • (String)

    Username used by this connection



212
213
214
# File 'lib/amqp/session.rb', line 212

def username
  @settings[:user]
end

#vhostString

vhost this connection uses. Default is “/”, a historically estabilished convention of RabbitMQ and amqp gem.

Returns:

  • (String)

    vhost this connection uses



768
769
770
# File 'lib/amqp/session.rb', line 768

def vhost
  @settings.fetch(:vhost, "/")
end