Class: AMQ::Client::Async::EventMachineClient

Inherits:
EM::Connection
  • Object
show all
Includes:
Adapter
Defined in:
lib/amq/client/async/adapters/event_machine.rb

Constant Summary collapse

Deferrable =

Backwards compatibility with 0.7.0.a25. MK.

EventMachine::DefaultDeferrable

Constants included from Openable

Openable::VALUES

Instance Attribute Summary

Attributes included from Openable

#status

Connection operations collapse

Instance Method Summary collapse

Methods included from Adapter

#auth_mechanism_adapter, #auto_recover, #auto_recovering?, #before_recovery, #content_complete?, #disconnect, #encode_credentials, #frameset_complete?, #get_next_frame, #handle_close, #handle_close_ok, #handle_open_ok, #handle_start, #handle_tune, #handshake, #heartbeat_interval, #heartbeats_enabled?, #negotiate_heartbeat_value, #on_connection_interruption, #on_error, #on_possible_authentication_failure, #on_recovery, #on_skipped_heartbeats, #on_tcp_connection_failure, #on_tcp_connection_loss, #open, #receive_frame, #receive_frameset, #reconnecting?, #reset_state!, #send_frame, #send_frameset, #send_heartbeat, #send_preamble, #start_automatic_recovery, #tcp_connection_failed, #tcp_connection_lost, #vhost

Methods included from RegisterEntityMixin

#register_entity

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Constructor Details

#initialize(*args) ⇒ EventMachineClient

Returns a new instance of EventMachineClient.



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/amq/client/async/adapters/event_machine.rb', line 140

def initialize(*args)
  super(*args)

  self.logger   = self.class.logger

  # channel => collected frames. MK.
  @frames            = Hash.new { Array.new }
  @channels          = Hash.new
  @callbacks         = Hash.new

  opening!

  # track TCP connection state, used to detect initial TCP connection failures.
  @tcp_connection_established       = false
  @tcp_connection_failed            = false
  @intentionally_closing_connection = false

  # EventMachine::Connection's and Adapter's constructors arity
  # make it easier to use *args. MK.
  @settings                           = Settings.configure(args.first)
  @on_tcp_connection_failure          = @settings[:on_tcp_connection_failure] || Proc.new { |settings|
    raise self.class.tcp_connection_failure_exception_class.new(settings)
  }
  @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
    raise self.class.authentication_failure_exception_class.new(settings)
  }

  @mechanism         = @settings.fetch(:auth_mechanism, "PLAIN")
  @locale            = @settings.fetch(:locale, "en_GB")
  @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))

  @auto_recovery     = (!!@settings[:auto_recovery])

  self.reset
  self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)
end

Class Method Details

.connect(settings = {}, &block) ⇒ Object

Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection succeeds.

Parameters:

  • settings (Hash) (defaults to: {})

Options Hash (settings):

  • :host (String) — default: "127.0.0.1"

    Hostname AMQ broker runs on.

  • :port (String) — default: 5672

    Port AMQ broker listens on.

  • :vhost (String) — default: "/"

    Virtual host to use.

  • :user (String) — default: "guest"

    Username to use for authentication.

  • :pass (String) — default: "guest"

    Password to use for authentication.

  • :auth_mechanism (String) — default: "PLAIN"

    SASL authentication mechanism to use.

  • :ssl (String) — default: false

    Should be use TLS (SSL) for connection?

  • :timeout (String) — default: nil

    Connection timeout.

  • :heartbeat (Fixnum) — default: 0

    Connection heartbeat, in seconds.

  • :frame_max (Fixnum) — default: 131072

    Maximum frame size to use. If broker cannot support frames this large, broker’s maximum value will be used instead.



41
42
43
44
45
46
47
48
# File 'lib/amq/client/async/adapters/event_machine.rb', line 41

def self.connect(settings = {}, &block)
  @settings = Settings.configure(settings)

  instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings)
  instance.register_connection_callback(&block)

  instance
end

Instance Method Details

#authenticating?Boolean

Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).

Returns:

  • (Boolean)


195
196
197
# File 'lib/amq/client/async/adapters/event_machine.rb', line 195

def authenticating?
  @authenticating
end

#connection_successfulObject

Called by AMQ::Client::Connection after we receive connection.open-ok.



328
329
330
331
332
333
# File 'lib/amq/client/async/adapters/event_machine.rb', line 328

def connection_successful
  @authenticating = false
  opened!

  @connection_deferrable.succeed
end

#disconnection_successfulObject

Called by AMQ::Client::Connection after we receive connection.close-ok.



339
340
341
342
343
344
345
346
# File 'lib/amq/client/async/adapters/event_machine.rb', line 339

def disconnection_successful
  @disconnection_deferrable.succeed

  # true for "after writing buffered data"
  self.close_connection(true)
  self.reset
  closed!
end

#establish_connection(settings) ⇒ Object

For EventMachine adapter, this is a no-op.



181
182
183
184
# File 'lib/amq/client/async/adapters/event_machine.rb', line 181

def establish_connection(settings)
  # Unfortunately there doesn't seem to be any sane way
  # how to get EventMachine connect to the instance level.
end

#handle_skipped_hearbeatsObject

Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.



352
353
354
355
356
357
358
359
# File 'lib/amq/client/async/adapters/event_machine.rb', line 352

def handle_skipped_hearbeats
  if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection
    @handling_skipped_hearbeats = true
    self.cancel_heartbeat_sender

    self.run_skipped_heartbeats_callbacks
  end
end

#on_closed(&block) ⇒ Object Also known as: on_disconnection

Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok). You can define more than one callback.



119
120
121
# File 'lib/amq/client/async/adapters/event_machine.rb', line 119

def on_closed(&block)
  @disconnection_deferrable.callback(&block)
end

#on_open(&block) ⇒ Object Also known as: on_connection

Defines a callback that will be executed when AMQP connection is considered open: client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds. You can define more than one callback.



110
111
112
# File 'lib/amq/client/async/adapters/event_machine.rb', line 110

def on_open(&block)
  @connection_deferrable.callback(&block)
end

#periodically_reconnect(period = 5) ⇒ Object

Periodically try to reconnect.

Parameters:

  • period (Fixnum) (defaults to: 5)

    Period of time, in seconds, to wait before reconnection attempt.

  • force (Boolean)

    If true, enforces immediate reconnection.



90
91
92
93
94
95
96
97
# File 'lib/amq/client/async/adapters/event_machine.rb', line 90

def periodically_reconnect(period = 5)
  @reconnecting = true
  self.reset

  @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) {
    EventMachine.reconnect(@settings[:host], @settings[:port], self)
  }
end

#reconnect(force = false, period = 5) ⇒ Object

Reconnect after a period of wait.

Parameters:

  • period (Fixnum) (defaults to: 5)

    Period of time, in seconds, to wait before reconnection attempt.

  • force (Boolean) (defaults to: false)

    If true, enforces immediate reconnection.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/amq/client/async/adapters/event_machine.rb', line 55

def reconnect(force = false, period = 5)
  if @reconnecting and not force
    EventMachine::Timer.new(period) {
      reconnect(true, period)
    }
    return
  end

  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end

#reconnect_to(settings, period = 5) ⇒ Object

Similar to #reconnect, but uses different connection settings

See Also:



74
75
76
77
78
79
80
81
82
# File 'lib/amq/client/async/adapters/event_machine.rb', line 74

def reconnect_to(settings, period = 5)
  if !@reconnecting
    @reconnecting = true
    self.reset
  end

  @settings = Settings.configure(settings)
  EventMachine.reconnect(@settings[:host], @settings[:port], self)
end

#resetObject (protected)



402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# File 'lib/amq/client/async/adapters/event_machine.rb', line 402

def reset
  @size      = 0
  @payload   = ""
  @frames    = Array.new

  @chunk_buffer                 = ""
  @connection_deferrable        = EventMachine::DefaultDeferrable.new
  @disconnection_deferrable     = EventMachine::DefaultDeferrable.new

  # used to track down whether authentication succeeded. AMQP 0.9.1 dictates
  # that on authentication failure broker must close TCP connection without sending
  # any more data. This is why we need to explicitly track whether we are past
  # authentication stage to signal possible authentication failures.
  @authenticating           = false
end

#tcp_connection_established?Boolean

IS TCP connection estabilished and currently active?

Returns:

  • (Boolean)


202
203
204
# File 'lib/amq/client/async/adapters/event_machine.rb', line 202

def tcp_connection_established?
  @tcp_connection_established
end

#upgrade_to_tls_if_necessaryObject (protected)



418
419
420
421
422
423
424
425
426
# File 'lib/amq/client/async/adapters/event_machine.rb', line 418

def upgrade_to_tls_if_necessary
  tls_options = @settings[:ssl]

  if tls_options.is_a?(Hash)
    start_tls(tls_options)
  elsif tls_options
    start_tls
  end
end