Class: AMQP::Session

Inherits:
EM::Connection
  • Object
show all
Extended by:
ProtocolMethodHandlers, RegisterEntityMixin
Includes:
Callbacks, ChannelIdAllocator, Openable
Defined in:
lib/amqp/session.rb

Overview

AMQP session represents connection to the broker. Session objects let you define callbacks for various TCP connection lifecycle events, for instance:

  • Connection is established
  • Connection has failed
  • Authentication has failed
  • Connection is lost (there is a network failure)
  • AMQP connection is opened
  • AMQP connection parameters (tuning) are negotiated and accepted by the broker
  • AMQP connection is properly closed

Key methods

Constant Summary

Deferrable =

Backwards compatibility with 0.7.0.a25. MK.

EventMachine::DefaultDeferrable

Constants included from Openable

Openable::VALUES

Constants included from ChannelIdAllocator

ChannelIdAllocator::MAX_CHANNELS_PER_CONNECTION

Broker information collapse

Connecting, reconnecting, disconnecting collapse

Instance Attribute Summary collapse

Attributes included from Openable

#status

Broker information collapse

Connecting, reconnecting, disconnecting collapse

Error Handling and Recovery collapse

Blocked connection notifications collapse

Connection operations collapse

Instance Method Summary collapse

Methods included from RegisterEntityMixin

register_entity

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Methods included from ChannelIdAllocator

#next_channel_id, #release_channel_id, #reset_channel_id_allocator

Constructor Details

#initialize(*args, &block) ⇒ Session

Returns a new instance of Session



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/amqp/session.rb', line 156

def initialize(*args, &block)
  super(*args)

  self.logger   = self.class.logger

  # channel => collected frames. MK.
  @frames            = Hash.new { Array.new }
  @channels          = Hash.new
  @callbacks         = Hash.new

  opening!

  # track TCP connection state, used to detect initial TCP connection failures.
  @tcp_connection_established       = false
  @tcp_connection_failed            = false
  @intentionally_closing_connection = false

  # EventMachine::Connection's and Adapter's constructors arity
  # make it easier to use *args. MK.
  @settings                           = Settings.configure(args.first)

  @on_tcp_connection_failure          = Proc.new { |settings|
    closed!
    if cb = @settings[:on_tcp_connection_failure]
      cb.call(settings)
    else
      raise self.class.tcp_connection_failure_exception_class.new(settings)
    end
  }

  @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
    raise self.class.authentication_failure_exception_class.new(settings)
  }

  @mechanism         = @settings.fetch(:auth_mechanism, "PLAIN")
  @locale            = @settings.fetch(:locale, "en_GB")
  @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))

  @auto_recovery     = (!!@settings[:auto_recovery])

  self.reset
  self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)
end

Instance Attribute Details

#callbacksArray<#call> (readonly)

Returns:

  • (Array<#call>)


60
61
62
# File 'lib/amqp/session.rb', line 60

def callbacks
  @callbacks
end

#channel_maxObject

Maximum channel number that the server permits this connection to use. Usable channel numbers are in the range 1..channel_max. Zero indicates no specified limit.



108
109
110
# File 'lib/amqp/session.rb', line 108

def channel_max
  @channel_max
end

#channelsObject (readonly)

Channels within this connection.



101
102
103
# File 'lib/amqp/session.rb', line 101

def channels
  @channels
end

#client_propertiesObject

Client capabilities



71
72
73
# File 'lib/amqp/session.rb', line 71

def client_properties
  @client_properties
end

#frame_maxObject

Maximum frame size that the server permits this connection to use.



113
114
115
# File 'lib/amqp/session.rb', line 113

def frame_max
  @frame_max
end

#known_hostsObject (readonly)



116
117
118
# File 'lib/amqp/session.rb', line 116

def known_hosts
  @known_hosts
end

#localeObject

The locale defines the language in which the server will send reply texts.



66
67
68
# File 'lib/amqp/session.rb', line 66

def locale
  @locale
end

#loggerObject

API



56
57
58
# File 'lib/amqp/session.rb', line 56

def logger
  @logger
end

#mechanismObject (readonly)

Authentication mechanism used.



91
92
93
# File 'lib/amqp/session.rb', line 91

def mechanism
  @mechanism
end

#server_authentication_mechanismsObject (readonly)

Authentication mechanisms broker supports.



96
97
98
# File 'lib/amqp/session.rb', line 96

def server_authentication_mechanisms
  @server_authentication_mechanisms
end

#server_capabilitiesHash (readonly)

Server capabilities (usually used to detect AMQP 0.9.1 extensions like basic.nack, publisher confirms and so on)



81
82
83
# File 'lib/amqp/session.rb', line 81

def server_capabilities
  @server_capabilities
end

#server_localesObject (readonly)

Locales server supports



86
87
88
# File 'lib/amqp/session.rb', line 86

def server_locales
  @server_locales
end

#server_propertiesHash (readonly)

Server properties (product information, platform, et cetera)



76
77
78
# File 'lib/amqp/session.rb', line 76

def server_properties
  @server_properties
end

#settingsObject



57
58
59
# File 'lib/amqp/session.rb', line 57

def settings
  @settings
end

Class Method Details

.connect(settings = {}, &block) ⇒ Object

Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection succeeds.

Parameters:

  • settings (Hash) (defaults to: {})

Options Hash (settings):

  • :host (String) — default: "127.0.0.1"

    Hostname AMQ broker runs on.

  • :port (String) — default: 5672

    Port AMQ broker listens on.

  • :vhost (String) — default: "/"

    Virtual host to use.

  • :user (String) — default: "guest"

    Username to use for authentication.

  • :pass (String) — default: "guest"

    Password to use for authentication.

  • :auth_mechanism (String) — default: "PLAIN"

    SASL authentication mechanism to use.

  • :ssl (String) — default: false

    Should be use TLS (SSL) for connection?

  • :timeout (String) — default: nil

    Connection timeout.

  • :heartbeat (Fixnum) — default: 0

    Connection heartbeat, in seconds.

  • :frame_max (Fixnum) — default: 131072

    Maximum frame size to use. If broker cannot support frames this large, broker’s maximum value will be used instead.



454
455
456
457
458
459
460
461
# File 'lib/amqp/session.rb', line 454

def self.connect(settings = {}, &block)
  @settings = Settings.configure(settings)

  instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
  instance.register_connection_callback(&block)

  instance
end

.loggerObject



125
126
127
128
129
130
# File 'lib/amqp/session.rb', line 125

def logger
  @logger ||= begin
                require "logger"
                Logger.new(STDERR)
              end
end

.logger=(logger) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/amqp/session.rb', line 132

def logger=(logger)
  methods = AMQP::Logging::REQUIRED_METHODS
  unless methods.all? { |method| logger.respond_to?(method) }
    raise AMQP::Logging::IncompatibleLoggerError.new(methods)
  end

  @logger = logger
end

.loggingBoolean

Returns Current value of logging flag.

Returns:

  • (Boolean)

    Current value of logging flag.



142
143
144
# File 'lib/amqp/session.rb', line 142

def logging
  settings[:logging]
end

.logging=(boolean) ⇒ Object

Turns loggin on or off.



147
148
149
# File 'lib/amqp/session.rb', line 147

def logging=(boolean)
  settings[:logging] = boolean
end

.settingsObject

Settings



121
122
123
# File 'lib/amqp/session.rb', line 121

def settings
  @settings ||= AMQP::Settings.default
end

Instance Method Details

#auth_mechanism_adapterObject

Retrieves an AuthMechanismAdapter that will encode credentials for this Adapter.



919
920
921
# File 'lib/amqp/session.rb', line 919

def auth_mechanism_adapter
  @auth_mechanism_adapter ||= AuthMechanismAdapter.for_adapter(self)
end

#authenticating?Boolean

Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).

Returns:

  • (Boolean)


545
546
547
# File 'lib/amqp/session.rb', line 545

def authenticating?
  @authenticating
end

#auto_recoverObject

Performs recovery of channels that are in the automatic recovery mode.



392
393
394
# File 'lib/amqp/session.rb', line 392

def auto_recover
  @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover }
end

#auto_recovering?Boolean Also known as: auto_recovery?

Returns whether connection is in the automatic recovery mode

Returns:

  • (Boolean)

    whether connection is in the automatic recovery mode



380
381
382
# File 'lib/amqp/session.rb', line 380

def auto_recovering?
  !!@auto_recovery
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



363
364
365
# File 'lib/amqp/session.rb', line 363

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#brokerAMQP::Broker

Returns Broker this connection is established with

Returns:

  • (AMQP::Broker)

    Broker this connection is established with



282
283
284
# File 'lib/amqp/session.rb', line 282

def broker
  @broker ||= AMQP::Broker.new(@server_properties)
end

#broker_endpointString

Returns Broker endpoint in the form of HOST:PORT/VHOST

Returns:

  • (String)

    Broker endpoint in the form of HOST:PORT/VHOST



221
222
223
# File 'lib/amqp/session.rb', line 221

def broker_endpoint
  "#{self.hostname}:#{self.port}/#{self.vhost}"
end

#connected?Boolean

Returns true if this AMQP connection is currently open

Returns:

  • (Boolean)

    true if this AMQP connection is currently open



202
203
204
# File 'lib/amqp/session.rb', line 202

def connected?
  self.opened?
end

#connection_successfulObject

Called by AMQP::Connection after we receive connection.open-ok.



678
679
680
681
682
683
# File 'lib/amqp/session.rb', line 678

def connection_successful
  @authenticating = false
  opened!

  @connection_deferrable.succeed
end

#content_complete?(frames) ⇒ Boolean (protected)

Determines, whether given frame array contains full content body

Returns:

  • (Boolean)


1119
1120
1121
1122
1123
1124
# File 'lib/amqp/session.rb', line 1119

def content_complete?(frames)
  return false if frames.empty?
  header = frames[0]
  raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame)
  header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size }
end

#disconnect(reply_code = 200, reply_text = "Goodbye", &block) ⇒ Object Also known as: close

Properly close connection with AMQ broker, as described in section 2.2.4 of the AMQP 0.9.1 specification.

See Also:

  • #close_connection


238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/amqp/session.rb', line 238

def disconnect(reply_code = 200, reply_text = "Goodbye", &block)
  @intentionally_closing_connection = true
  self.on_disconnection do
    @frames.clear
    block.call if block
  end

  # ruby-amqp/amqp#66, MK.
  if self.open?
    closing!
    self.send_frame(AMQ::Protocol::Connection::Close.encode(reply_code, reply_text, 0, 0))
  elsif self.closing?
    # no-op
  else
    self.disconnection_successful
  end
end

#disconnection_successfulObject

Called by AMQP::Connection after we receive connection.close-ok.



689
690
691
692
693
694
695
696
# File 'lib/amqp/session.rb', line 689

def disconnection_successful
  @disconnection_deferrable.succeed

  # true for "after writing buffered data"
  self.close_connection(true)
  self.reset
  closed!
end

#encode_credentials(username, password) ⇒ Object

See Also:



911
912
913
# File 'lib/amqp/session.rb', line 911

def encode_credentials(username, password)
  auth_mechanism_adapter.encode_credentials(username, password)
end

#frameset_complete?(frames) ⇒ Boolean (protected)

Determines, whether the received frameset is ready to be further processed

Returns:

  • (Boolean)


1112
1113
1114
1115
1116
# File 'lib/amqp/session.rb', line 1112

def frameset_complete?(frames)
  return false if frames.empty?
  first_frame = frames[0]
  first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1]))
end

#get_next_frameObject (protected)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns next frame from buffer whenever possible



1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
# File 'lib/amqp/session.rb', line 1094

def get_next_frame
  return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
  # octet + short
  offset = 3 # 1 + 2
  # length
  payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
  # 4 bytes for long payload length, 1 byte final octet
  frame_length = offset + payload_length + 5
  if frame_length <= @chunk_buffer.size
    @chunk_buffer.slice!(0, frame_length)
  else
    nil
  end
end

#handle_close(conn_close) ⇒ Object

Handles connection.close. When broker detects a connection level exception, this method is called.



1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
# File 'lib/amqp/session.rb', line 1051

def handle_close(conn_close)
  closed!
  # getting connection.close during connection negotiation means authentication
  # has failed (RabbitMQ 3.2+):
  # http://www.rabbitmq.com/auth-notification.html
  if authenticating?
    @on_possible_authentication_failure.call(@settings) if @on_possible_authentication_failure
  end
  self.exec_callback_yielding_self(:error, conn_close)
end

#handle_close_ok(close_ok) ⇒ Object

Handles Connection.Close-Ok.



1067
1068
1069
1070
# File 'lib/amqp/session.rb', line 1067

def handle_close_ok(close_ok)
  closed!
  self.disconnection_successful
end

#handle_connection_blocked(connection_blocked) ⇒ Object



1073
1074
1075
# File 'lib/amqp/session.rb', line 1073

def handle_connection_blocked(connection_blocked)
  @on_blocked.call(self, connection_blocked) if @on_blocked
end

#handle_connection_unblocked(connection_unblocked) ⇒ Object



1077
1078
1079
# File 'lib/amqp/session.rb', line 1077

def handle_connection_unblocked(connection_unblocked)
  @on_unblocked.call(self, connection_unblocked) if @on_unblocked
end

#handle_open_ok(open_ok) ⇒ Object

Handles Connection.Open-Ok.



1039
1040
1041
1042
1043
1044
# File 'lib/amqp/session.rb', line 1039

def handle_open_ok(open_ok)
  @known_hosts = open_ok.known_hosts.dup.freeze

  opened!
  self.connection_successful if self.respond_to?(:connection_successful)
end

#handle_skipped_heartbeatsObject

Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.



702
703
704
705
706
707
708
709
# File 'lib/amqp/session.rb', line 702

def handle_skipped_heartbeats
  if !@handling_skipped_heartbeats && @tcp_connection_established && !@intentionally_closing_connection
    @handling_skipped_heartbeats = true
    self.cancel_heartbeat_sender

    self.run_skipped_heartbeats_callbacks
  end
end

#handle_start(connection_start) ⇒ Object

Handles connection.start.



999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
# File 'lib/amqp/session.rb', line 999

def handle_start(connection_start)
  @server_properties                = connection_start.server_properties
  @server_capabilities              = @server_properties["capabilities"]

  @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
  @server_locales                   = Array(connection_start.locales)

  username = @settings[:user] || @settings[:username]
  password = @settings[:pass] || @settings[:password]

  # It's not clear whether we should transition to :opening state here
  # or in #open but in case authentication fails, it would be strange to have
  # @status undefined. So lets do this. MK.
  opening!

  self.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, mechanism, self.encode_credentials(username, password), @locale))
end

#handle_tune(connection_tune) ⇒ Object

Handles Connection.Tune-Ok.



1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
# File 'lib/amqp/session.rb', line 1022

def handle_tune(connection_tune)
  @channel_max        = connection_tune.channel_max.freeze
  @frame_max          = connection_tune.frame_max.freeze

  client_heartbeat    = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0

  @heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat)

  self.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
  self.initialize_heartbeat_sender if heartbeats_enabled?
end

#handshakeObject

Sends connection preamble to the broker.



888
889
890
891
# File 'lib/amqp/session.rb', line 888

def handshake
  @authenticating = true
  self.send_preamble
end

#heartbeat_intervalFixnum

Returns heartbeat interval this client uses, in seconds. This value may or may not be used depending on broker capabilities. Zero means the server does not want a heartbeat.

Returns:

  • (Fixnum)

    Heartbeat interval this client uses, in seconds.

See Also:



767
768
769
# File 'lib/amqp/session.rb', line 767

def heartbeat_interval
  @heartbeat_interval
end

#heartbeats_enabled?Boolean

Returns true if heartbeats are enabled (heartbeat interval is greater than 0)

Returns:

  • (Boolean)


773
774
775
# File 'lib/amqp/session.rb', line 773

def heartbeats_enabled?
  @heartbeat_interval && (@heartbeat_interval > 0)
end

#hostnameString Also known as: host

Returns Broker hostname this connection uses

Returns:

  • (String)

    Broker hostname this connection uses



208
209
210
# File 'lib/amqp/session.rb', line 208

def hostname
  @settings[:host]
end

#negotiate_heartbeat_value(client_value, server_value) ⇒ Object (protected)



1083
1084
1085
1086
1087
1088
1089
# File 'lib/amqp/session.rb', line 1083

def negotiate_heartbeat_value(client_value, server_value)
  if client_value == 0 || server_value == 0
    [client_value, server_value].max
  else
    [client_value, server_value].min
  end
end

#on_blocked(&fn) ⇒ Object



401
402
403
# File 'lib/amqp/session.rb', line 401

def on_blocked(&fn)
  @on_blocked = fn
end

#on_closed(&block) ⇒ Object Also known as: on_disconnection

Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok). You can define more than one callback.

See Also:



309
310
311
# File 'lib/amqp/session.rb', line 309

def on_closed(&block)
  @disconnection_deferrable.callback(&block)
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



342
343
344
# File 'lib/amqp/session.rb', line 342

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_error(&block) ⇒ Object

Defines a callback that will be executed when connection is closed after connection-level exception. Only one callback can be defined (the one defined last replaces previously added ones).



353
354
355
# File 'lib/amqp/session.rb', line 353

def on_error(&block)
  self.redefine_callback(:error, &block)
end

#on_open(&block) ⇒ Object Also known as: on_connection

Defines a callback that will be executed when AMQP connection is considered open: after client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds. You can define more than one callback.

See Also:



296
297
298
# File 'lib/amqp/session.rb', line 296

def on_open(&block)
  @connection_deferrable.callback(&block)
end

#on_possible_authentication_failure(&block) ⇒ Object

Defines a callback that will be run when TCP connection is closed before authentication finishes. Usually this means authentication failure. You can define only one callback.



334
335
336
# File 'lib/amqp/session.rb', line 334

def on_possible_authentication_failure(&block)
  @on_possible_authentication_failure = block
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed after AMQP connection has recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



372
373
374
# File 'lib/amqp/session.rb', line 372

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#on_skipped_heartbeats(&block) ⇒ Object

Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected). Only one callback can be defined (the one defined last replaces previously added ones).



868
869
870
# File 'lib/amqp/session.rb', line 868

def on_skipped_heartbeats(&block)
  self.redefine_callback(:skipped_heartbeats, &block)
end

#on_tcp_connection_failure(&block) ⇒ Object

Defines a callback that will be run when initial TCP connection fails. You can define only one callback.



318
319
320
# File 'lib/amqp/session.rb', line 318

def on_tcp_connection_failure(&block)
  @on_tcp_connection_failure = block
end

#on_tcp_connection_loss(&block) ⇒ Object

Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). You can define only one callback.



326
327
328
# File 'lib/amqp/session.rb', line 326

def on_tcp_connection_loss(&block)
  @on_tcp_connection_loss = block
end

#on_unblocked(&fn) ⇒ Object



405
406
407
# File 'lib/amqp/session.rb', line 405

def on_unblocked(&fn)
  @on_unblocked = fn
end

#open(vhost = "/") ⇒ Object

Sends connection.open to the server.



898
899
900
# File 'lib/amqp/session.rb', line 898

def open(vhost = "/")
  self.send_frame(AMQ::Protocol::Connection::Open.encode(vhost))
end

#periodically_reconnect(period = 5) ⇒ Object

Periodically try to reconnect.

Parameters:

  • period (Fixnum) (defaults to: 5)

    Period of time, in seconds, to wait before reconnection attempt.

  • force (Boolean)

    If true, enforces immediate reconnection.



512
513
514
515
516
517
518
519
# File 'lib/amqp/session.rb', line 512

def periodically_reconnect(period = 5)
  @reconnecting = true
  self.reset

  @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) {
    EventMachine.reconnect(@settings[:host], @settings[:port], self)
  }
end

#portString

Returns Broker port this connection uses

Returns:

  • (String)

    Broker port this connection uses



215
216
217
# File 'lib/amqp/session.rb', line 215

def port
  @settings[:port]
end

#receive_frame(frame) ⇒ Object

Processes a single frame.

Parameters:

  • frame (AMQ::Protocol::Frame)


928
929
930
931
932
933
934
935
936
937
938
939
940
# File 'lib/amqp/session.rb', line 928

def receive_frame(frame)
  @frames[frame.channel] ||= Array.new
  @frames[frame.channel] << frame

  if frameset_complete?(@frames[frame.channel])
    begin
      receive_frameset(@frames[frame.channel])
    ensure # Ensure that frames always will be cleared
      # for channel.close, frame.channel will be nil. MK.
      clear_frames_on(frame.channel) if @frames[frame.channel]
    end
  end
end

#receive_frameset(frames) ⇒ Object

Processes a frameset by finding and invoking a suitable handler. Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat value.

Parameters:

  • frames (Array<AMQ::Protocol::Frame>)


948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
# File 'lib/amqp/session.rb', line 948

def receive_frameset(frames)
  if self.heartbeats_enabled?
    # treat incoming traffic as heartbeats.
    # this operation is pretty expensive under heavy traffic but heartbeats can be disabled
    # (and are also disabled by default). MK.
    @last_server_heartbeat = Time.now
  end
  frame = frames.first

  if AMQ::Protocol::HeartbeatFrame === frame
    # no-op
  else
    if callable = AMQP::HandlersRegistry.find(frame.method_class)
      f = frames.shift
      callable.call(self, f, frames)
    else
      raise MissingHandlerError.new(frames.first)
    end
  end
end

#reconnect(force = false, period = 5) ⇒ Object

Reconnect after a period of wait.

Parameters:

  • period (Fixnum) (defaults to: 5)

    Period of time, in seconds, to wait before reconnection attempt.

  • force (Boolean) (defaults to: false)

    If true, enforces immediate reconnection.



468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/amqp/session.rb', line 468

def reconnect(force = false, period = 5)
  if @reconnecting and not force
    EventMachine::Timer.new(period) {
      reconnect(true, period)
    }
    return
  end

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end

#reconnect_to(connection_string_or_options, period = 5) ⇒ Object

Similar to #reconnect, but uses different connection settings

See Also:



487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
# File 'lib/amqp/session.rb', line 487

def reconnect_to(connection_string_or_options, period = 5)
  settings = case connection_string_or_options
             when String then
               AMQP.parse_connection_uri(connection_string_or_options)
             when Hash then
               connection_string_or_options
             else
               Hash.new
             end

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  @settings = Settings.configure(settings)
  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end

#reconnecting?Boolean

Returns:

  • (Boolean)


809
810
811
# File 'lib/amqp/session.rb', line 809

def reconnecting?
  @reconnecting
end

#resetObject (protected)



1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
# File 'lib/amqp/session.rb', line 1163

def reset
  @size      = 0
  @payload   = ""
  @frames    = Array.new

  @chunk_buffer                 = ""
  @connection_deferrable        = EventMachine::DefaultDeferrable.new
  @disconnection_deferrable     = EventMachine::DefaultDeferrable.new

  # used to track down whether authentication succeeded. AMQP 0.9.1 dictates
  # that on authentication failure broker must close TCP connection without sending
  # any more data. This is why we need to explicitly track whether we are past
  # authentication stage to signal possible authentication failures.
  @authenticating           = false
end

#reset_state!Object

Resets connection state.



905
906
907
# File 'lib/amqp/session.rb', line 905

def reset_state!
  # no-op by default
end

#send_frame(frame) ⇒ Object

Sends frame to the peer, checking that connection is open.



736
737
738
739
740
741
742
# File 'lib/amqp/session.rb', line 736

def send_frame(frame)
  if closed?
    raise ConnectionClosedError.new(frame)
  else
    self.send_raw(frame.encode)
  end
end

#send_frameset(frames, channel) ⇒ Object

Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.



748
749
750
751
752
753
754
755
756
757
# File 'lib/amqp/session.rb', line 748

def send_frameset(frames, channel)
  # some (many) developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    frames.each { |frame| self.send_frame(frame) }
  end
end

#send_heartbeatObject

Sends a heartbeat frame if connection is open.



980
981
982
983
984
985
986
987
988
# File 'lib/amqp/session.rb', line 980

def send_heartbeat
  if tcp_connection_established? && !@handling_skipped_heartbeats && @last_server_heartbeat
    if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting?
      logger.error "[amqp] Detected missing server heartbeats"
      self.handle_skipped_heartbeats
    end
    send_frame(AMQ::Protocol::HeartbeatFrame)
  end
end

#send_preambleObject

Note:

This must be implemented by all AMQP clients.

Sends AMQ protocol header (also known as preamble).



729
730
731
# File 'lib/amqp/session.rb', line 729

def send_preamble
  self.send_raw(AMQ::Protocol::PREAMBLE)
end

#start_automatic_recoveryObject

Performs recovery of channels that are in the automatic recovery mode. “before recovery” callbacks are run immediately, “after recovery” callbacks are run after AMQP connection is re-established and auto recovery is performed (using #auto_recover).

Use this method if you want to run automatic recovery process after handling a connection-level exception, for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).



851
852
853
854
855
856
857
858
859
860
# File 'lib/amqp/session.rb', line 851

def start_automatic_recovery
  self.run_before_recovery_callbacks
  self.register_connection_callback do
    # always run automatic recovery, because it is per-channel
    # and connection has to start it. Channels that did not opt-in for
    # autorecovery won't be selected. MK.
    self.auto_recover
    self.run_after_recovery_callbacks
  end
end

#tcp_connection_established?Boolean

IS TCP connection estabilished and currently active?

Returns:

  • (Boolean)


552
553
554
# File 'lib/amqp/session.rb', line 552

def tcp_connection_established?
  @tcp_connection_established
end

#tcp_connection_failedObject

Called when initial TCP connection fails.



793
794
795
796
797
# File 'lib/amqp/session.rb', line 793

def tcp_connection_failed
  @recovered = false

  @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
end

#tcp_connection_lostObject

Called when previously established TCP connection fails.



801
802
803
804
805
806
# File 'lib/amqp/session.rb', line 801

def tcp_connection_lost
  @recovered = false

  @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
  self.handle_connection_interruption
end

#upgrade_to_tls_if_necessaryObject (protected)



1179
1180
1181
1182
1183
1184
1185
1186
1187
# File 'lib/amqp/session.rb', line 1179

def upgrade_to_tls_if_necessary
  tls_options = @settings[:ssl]

  if tls_options.is_a?(Hash)
    start_tls(tls_options)
  elsif tls_options
    start_tls
  end
end

#usernameString Also known as: user

Returns Username used by this connection

Returns:

  • (String)

    Username used by this connection



227
228
229
# File 'lib/amqp/session.rb', line 227

def username
  @settings[:user]
end

#vhostString

vhost this connection uses. Default is “/”, a historically estabilished convention of RabbitMQ and amqp gem.

Returns:

  • (String)

    vhost this connection uses



783
784
785
# File 'lib/amqp/session.rb', line 783

def vhost
  @settings.fetch(:vhost, "/")
end