Class: Stomper::Connection

Inherits:
Object
  • Object
show all
Includes:
Extensions::Common, Extensions::Events, Extensions::Heartbeat, Extensions::Scoping
Defined in:
lib/stomper/connection.rb

Overview

This class encapsulates a client connection to a message broker through the Stomp protocol. This class is also aliased as Stomper::Client

Constant Summary collapse

PROTOCOL_VERSIONS =

The list of supported protocol versions

Returns:

  • (Array<String>)
['1.0', '1.1']
DEFAULT_CONFIG =

The default configuration for connections. These settings have been deliberately left unfrozen to allow users to change defaults for all connections in one fell swoop.

Returns:

  • ({Symbol => Object})
{
  :versions => ['1.0', '1.1'],
  :heartbeats => [0, 0],
  :host => nil,
  :login => nil,
  :passcode => nil,
  :receiver_class => ::Stomper::Receivers::Threaded
}

Constants included from Extensions::Heartbeat

Extensions::Heartbeat::EXTEND_BY_VERSION

Constants included from Extensions::Events

Extensions::Events::ALIASED_EVENTS

Constants included from Extensions::Common

Extensions::Common::EXTEND_BY_VERSION

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Extensions::Heartbeat

#alive?, #beat, #dead?, extend_by_protocol_version

Methods included from Extensions::Events

#after_receiving, #after_transmitting, #before_abort, #before_ack, #before_begin, #before_client_beat, #before_commit, #before_connect, #before_disconnect, #before_nack, #before_receiving, #before_send, #before_subscribe, #before_transmitting, #before_unsubscribe, #bind_callback, #on_abort, #on_ack, #on_begin, #on_broker_beat, #on_client_beat, #on_commit, #on_connect, #on_connected, #on_connection_closed, #on_connection_died, #on_connection_established, #on_connection_terminated, #on_disconnect, #on_error, #on_message, #on_nack, #on_receipt, #on_send, #on_subscribe, #on_unsubscribe

Methods included from Extensions::Scoping

#with_headers, #with_receipt, #with_transaction

Methods included from Extensions::Common

#abort, #ack, #begin, #commit, #disconnect, extend_by_protocol_version, #nack, #send, #subscribe, #unsubscribe

Constructor Details

#initialize(uri, options = {}) ⇒ Connection

Creates a connection to a broker specified by the suppled uri. The given uri will be resolved to a URI instance through URI.parse. The final URI object must provide a create_socket method, or an error will be raised. Both URI::STOMP and URI::STOMP_SSL provide this method, so string URIs with a scheme of either “stomp” or “stomp+ssl” will work automatically. Most connection options can be supplied through query parameters specified in the URI or through an optional Hash parameter. If the same option is configured in both the URI’s parameters and the options hash, the options hash takes precedence. Certain options, those pertaining to SSL settings for instance, must be configured through the options hash.

Examples:

Connecting to a broker on ‘host.domain.tld’ and a port of 12345

con = Stomper::Connection.new('stomp://host.domain.tld:12345')

Connecting with login credentials

con = Stomper::Connection.new('stomp://username:[email protected]')

Connecting using Stomp protocol 1.1, sending client beats once per second, and no interest in server beats.

con = Stomper::Connection.new('stomp://host/?versions=1.1&heartbeats=1000&heartbeats=0')
con = Stomper::Connection.new('stomp://host', :versions => '1.1', :heartbeats => [1000, 0])
# both result in:
con.heartbeat #=> [1000, 0]
con.versions   #=> ['1.1']

Repeated options in URI and options hash

con = Stomper::Connection.new('stomp://host?versions=1.1&versions=1.0', :versions => '1.1')
con.versions #=> '1.1'
# In this case, the versions query parameter value +[1.1 , 1.0]+ is
# overridden by the options hash setting +1.1+

Parameters:

  • uri (String)

    a string representing the URI to the message broker’s Stomp interface.

  • options ({Symbol => Object}) (defaults to: {})

    additional options for the connection.

Options Hash (options):

  • :versions (Array<String>) — default: ['1.0', '1.1']

    protocol versions this connection should allow.

  • :heartbeats (Array<Fixnum>) — default: [0, 0]

    heartbeat timings for this connection in milliseconds (a zero indicates that heartbeating is not desired from the client or the broker)

  • :ssl ({Symbol => Object}) — default: {}

    SSL specific options to pass on when creating an SSL connection.

  • :host (String) — default: nil

    Host name to pass as host header on CONNECT frames (will use actual connection hostname if not set)

  • :login (String) — default: nil

    Username to send as login header for credential authenticated connections.

  • :passcode (String) — default: nil

    Password to send as passcode header for credential authenticated connections.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/stomper/connection.rb', line 151

def initialize(uri, options={})
  @ssl = options.delete(:ssl) || {}
  @uri = uri.is_a?(::URI) ? uri : ::URI.parse(uri)
  config = ::Stomper::Support.keys_to_sym(::CGI.parse(@uri.query || '')).
    merge(::Stomper::Support.keys_to_sym(options))
  DEFAULT_CONFIG.each do |attr_name, def_val|
    if config.key? attr_name
      __send__ :"#{attr_name}=", config[attr_name]
    elsif def_val
      __send__ :"#{attr_name}=", def_val
    end
  end
  @host ||= (@uri.host||'localhost')
  @login ||= (@uri.user || '')
  @passcode ||= (@uri.password || '')
  @connected = false
  @heartbeating = [0,0]
  @last_transmitted_at = @last_received_at = nil
  @subscription_manager = ::Stomper::SubscriptionManager.new(self)
  @receipt_manager = ::Stomper::ReceiptManager.new(self)
  @connecting = false
  @disconnecting = false
  @disconnected = false
  @close_mutex = ::Mutex.new
  
  on_connected do |cf, con|
    unless @connected
      @version = (cf[:version].nil?||cf[:version].empty?) ? '1.0' : cf[:version]
      unless @versions.include?(@version)
        close
        raise ::Stomper::Errors::UnsupportedProtocolVersionError,
          "broker requested '#{@version}', client allows: #{@versions.inspect}"
      end
      c_x, c_y = @heartbeats
      s_x, s_y = (cf[:'heart-beat'] || '0,0').split(',').map do |v|
        vi = v.to_i
        vi > 0 ? vi : 0
      end
      @heartbeating = [ (c_x == 0||s_y == 0 ? 0 : [c_x,s_y].max), 
        (c_y == 0||s_x == 0 ? 0 : [c_y,s_x].max) ]

      extend_for_protocol
    end
  end

  before_disconnect do |df, con|
    @disconnecting = true
  end
  on_disconnect do |df, con|
    @disconnected = true
    close unless df[:receipt]
  end
end

Instance Attribute Details

#connected_frameStomper::Frame? (readonly)

The CONNECTED frame sent by the broker during the connection handshake.

Returns:



34
35
36
# File 'lib/stomper/connection.rb', line 34

def connected_frame
  @connected_frame
end

#heartbeatingArray<Fixnum> (readonly)

The negotiated heartbeat strategy. The first element is the maximum number of milliseconds that the client can go without transmitting data or a heartbeat (a zero indicates that a client does not need to send heartbeats.) The second elemenet is the maximum number of milliseconds a server will go without transmitting data or a heartbeat (a zero indicates that the server need not send any heartbeats.)

Returns:

  • (Array<Fixnum>)


56
57
58
# File 'lib/stomper/connection.rb', line 56

def heartbeating
  @heartbeating
end

#heartbeatsArray<Fixnum>

The client-side heartbeat settings to allow for this connection

Returns:

  • (Array<Fixnum>)


47
48
49
# File 'lib/stomper/connection.rb', line 47

def heartbeats
  @heartbeats
end

#hostString

The host header value to send to the broker when connecting. This allows the client to inform the server which host it wishes to connect with when multiple brokers may share an IP address through virtual hosting.

Returns:

  • (String)


66
67
68
# File 'lib/stomper/connection.rb', line 66

def host
  @host
end

#last_received_atTime? (readonly)

A timestamp set to the last time a frame was received. Returns nil if no frames have been received yet

Returns:

  • (Time, nil)


89
90
91
# File 'lib/stomper/connection.rb', line 89

def last_received_at
  @last_received_at
end

#last_transmitted_atTime? (readonly)

A timestamp set to the last time a frame was transmitted. Returns nil if no frames have been transmitted yet

Returns:

  • (Time, nil)


84
85
86
# File 'lib/stomper/connection.rb', line 84

def last_transmitted_at
  @last_transmitted_at
end

#loginString

The login header value to send to the broker when connecting.

Returns:

  • (String)


70
71
72
# File 'lib/stomper/connection.rb', line 70

def 
  @login
end

#passcodeString

The passcode header value to send to the broker when connecting.

Returns:

  • (String)


74
75
76
# File 'lib/stomper/connection.rb', line 74

def passcode
  @passcode
end

#receipt_managerStomper::ReceiptManager (readonly)

The receipt manager. Maintains the list of receipt IDs and the callbacks associated with them that will be invoked when any frame with a matching receipt-id header is received.



101
102
103
# File 'lib/stomper/connection.rb', line 101

def receipt_manager
  @receipt_manager
end

#receiver_classCLass

The class to use when instantiating a new receiver for the connection. Defaults to Receivers::Threaded

Returns:

  • (CLass)


79
80
81
# File 'lib/stomper/connection.rb', line 79

def receiver_class
  @receiver_class
end

#ssl{Symbol => Object}? (readonly)

The SSL options to use if this connection is secure

Returns:

  • ({Symbol => Object}, nil)


60
61
62
# File 'lib/stomper/connection.rb', line 60

def ssl
  @ssl
end

#subscription_managerStomper::SubscriptionManager (readonly)

The subscription manager. Maintains the list of destinations subscribed to as well as the callbacks to invoke when a MESSAGE frame is received on one of them.



95
96
97
# File 'lib/stomper/connection.rb', line 95

def subscription_manager
  @subscription_manager
end

#uriURI (readonly)

The URI representation of the broker this connection is associated with

Returns:



30
31
32
# File 'lib/stomper/connection.rb', line 30

def uri
  @uri
end

#versionString? (readonly)

The protocol version negotiated between the client and broker. Will be nil until the connection has been established.

Returns:

  • (String, nil)


43
44
45
# File 'lib/stomper/connection.rb', line 43

def version
  @version
end

#versionsArray<String>

The protocol versions to allow for this connection

Returns:

  • (Array<String>)


38
39
40
# File 'lib/stomper/connection.rb', line 38

def versions
  @versions
end

Class Method Details

.connect(uri, options = {}) ⇒ Object Also known as: open

Creates a new connection and immediately connects it to the broker.

See Also:



307
308
309
310
311
# File 'lib/stomper/connection.rb', line 307

def connect(uri, options={})
  conx = new(uri, options)
  conx.connect
  conx
end

Instance Method Details

#closeObject

Disconnects from the broker immediately. This is not a polite disconnect, meaning that no DISCONNECT frame is transmitted to the broker, the socket is shutdown and closed immediately. Calls to Extensions::Common#disconnect invoke this method internally after the DISCONNECT frame has been transmitted. This method always triggers the on_connection_closed event and if true is passed as a parameter, on_connection_terminated will be triggered as well.



373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/stomper/connection.rb', line 373

def close
  @close_mutex.synchronize do
    if connected?
      begin
        trigger_event(:on_connection_terminated, self) unless @disconnected
      ensure
        unless @socket.closed?
          @socket.shutdown(2) rescue nil
          @socket.close rescue nil
        end
        @connecting = @connected = false
      end
      trigger_event(:on_connection_closed, self)
      subscription_manager.clear
      receipt_manager.clear
    end
  end
end

#connect(headers = {}) ⇒ Object

Establishes a connection to the broker. After the socket connection is established, a CONNECT/STOMP frame will be sent to the broker and a frame will be read from the TCP stream. If the frame is a CONNECTED frame, the connection has been established and you’re ready to go, otherwise the socket will be closed and an error will be raised.



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/stomper/connection.rb', line 274

def connect(headers={})
  unless @connected
    @socket = @uri.create_socket(@ssl)
    @serializer = ::Stomper::FrameSerializer.new(@socket)
    m_headers = {
      :'accept-version' => @versions.join(','),
      :host => @host,
      :'heart-beat' => @heartbeats.join(','),
      :login => @login,
      :passcode => @passcode
    }
    @disconnecting = false
    @disconnected = false
    @connecting = true
    transmit create_frame('CONNECT', headers, m_headers)
    receive.tap do |f|
      if f.command == 'CONNECTED'
        @connected_frame = f
        @connected = true
        @connecting = false
        trigger_event(:on_connection_established, self)
      else
        close
        raise ::Stomper::Errors::ConnectFailedError, 'broker did not send CONNECTED frame'
      end
    end
  end
end

#connected?true, false

True if a connection with the broker has been established or is in the process of being established, false otherwise.

Returns:

  • (true, false)


318
319
320
# File 'lib/stomper/connection.rb', line 318

def connected?
  (@connecting || @connected) && !@socket.closed?
end

#duration_since_receivedFixnum

Duration in milliseconds since a frame has been received from the broker.

Returns:

  • (Fixnum)


454
455
456
# File 'lib/stomper/connection.rb', line 454

def duration_since_received
  @last_received_at && ((Time.now - @last_received_at)*1000).to_i
end

#duration_since_transmittedFixnum

Duration in milliseconds since a frame has been transmitted to the broker.

Returns:

  • (Fixnum)


448
449
450
# File 'lib/stomper/connection.rb', line 448

def duration_since_transmitted
  @last_transmitted_at && ((Time.now - @last_transmitted_at)*1000).to_i
end

#openObject

Establishes a connection to the broker. After the socket connection is established, a CONNECT/STOMP frame will be sent to the broker and a frame will be read from the TCP stream. If the frame is a CONNECTED frame, the connection has been established and you’re ready to go, otherwise the socket will be closed and an error will be raised.



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/stomper/connection.rb', line 302

def connect(headers={})
  unless @connected
    @socket = @uri.create_socket(@ssl)
    @serializer = ::Stomper::FrameSerializer.new(@socket)
    m_headers = {
      :'accept-version' => @versions.join(','),
      :host => @host,
      :'heart-beat' => @heartbeats.join(','),
      :login => @login,
      :passcode => @passcode
    }
    @disconnecting = false
    @disconnected = false
    @connecting = true
    transmit create_frame('CONNECT', headers, m_headers)
    receive.tap do |f|
      if f.command == 'CONNECTED'
        @connected_frame = f
        @connected = true
        @connecting = false
        trigger_event(:on_connection_established, self)
      else
        close
        raise ::Stomper::Errors::ConnectFailedError, 'broker did not send CONNECTED frame'
      end
    end
  end
end

#receiveStomper::Frame

Receives a frame from the broker.

Returns:



413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
# File 'lib/stomper/connection.rb', line 413

def receive
  if alive?
    trigger_event(:before_receiving, nil, self)
    begin
      @serializer.read_frame.tap do |f|
        if f.nil?
          close
        else
          @last_received_at = Time.now
          trigger_event(:after_receiving, f, self)
          trigger_received_frame(f, self)
        end
      end
    rescue ::IOError, ::SystemCallError
      close
      raise
    end
  else
    trigger_event(:on_connection_died, self)
    nil
  end
end

#receive_nonblockStomper::Frame?

Note:

While this method will not block if there is no data ready for reading, if any data is available it will block until a complete frame has been read.

Receives a frame from the broker if there is data to be read from the underlying socket. If there is no data available for reading from the socket, nil is returned.

Returns:



442
443
444
# File 'lib/stomper/connection.rb', line 442

def receive_nonblock
  receive if @socket.ready?
end

#running?Boolean

Returns true if the receiver exists and is running.

Returns:

  • (Boolean)


355
356
357
# File 'lib/stomper/connection.rb', line 355

def running?
  @receiver && @receiver.running?
end

#start(headers = {}) ⇒ self

Creates an instance of the class given by #receiver_class and starts it. A call to #connect will be made if the connection has not been established. The class to instantiate can be overridden on a per connection basis, or for all connections by changing DEFAULT_CONFIG

Parameters:

  • headers ({Object => String}) (defaults to: {})

    optional headers to pass to #connect if the connection has not yet been established.

Returns:

  • (self)

See Also:



331
332
333
334
335
336
# File 'lib/stomper/connection.rb', line 331

def start(headers={})
  connect(headers) unless @connected
  @receiver ||= receiver_class.new(self)
  @receiver.start
  self
end

#stop(headers = {}) ⇒ self

Stops the instantiated receiver and calls Extensions::Common#disconnect if a connection has been established.

Parameters:

  • headers ({Object => String}) (defaults to: {})

    optional headers to pass to Extensions::Common#disconnect if the connection has been established.

Returns:

  • (self)

Raises:

  • (Exception)

    if invoking stop on the receiver raises an exception

See Also:



348
349
350
351
352
# File 'lib/stomper/connection.rb', line 348

def stop(headers={})
  disconnect(headers) unless @disconnecting
  @receiver && @receiver.stop
  self
end

#transmit(frame) ⇒ Object

Transmits a frame to the broker. This is a low-level method used internally by the more user friendly interface.

Parameters:



395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/stomper/connection.rb', line 395

def transmit(frame)
  trigger_event(:on_connection_died, self) if dead?
  trigger_event(:before_transmitting, frame, self)
  trigger_before_transmitted_frame(frame, self)
  begin
    @serializer.write_frame(frame).tap do
      @last_transmitted_at = Time.now
      trigger_event(:after_transmitting, frame, self)
      trigger_transmitted_frame(frame, self)
    end
  rescue ::IOError, ::SystemCallError
    close
    raise
  end
end