Class: Stomp::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp/connection.rb,
lib/connection/utf8.rb,
lib/connection/netio.rb,
lib/connection/utils.rb,
lib/connection/heartbeats.rb

Overview

Low level connection which maps commands and supports synchronous receives

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Connection

A new Connection object can be initialized using two forms:

Hash (this is the recommended Connection initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  # These are the default parameters and do not need to be set
  :reliable => true,                  # reliable (use failover)
  :initial_reconnect_delay => 0.01,   # initial delay before reconnect (secs)
  :max_reconnect_delay => 30.0,       # max delay before reconnect
  :use_exponential_back_off => true,  # increase delay between reconnect attpempts
  :back_off_multiplier => 2,          # next delay multiplier
  :max_reconnect_attempts => 0,       # retry forever, use # for maximum attempts
  :randomize => false,                # do not radomize hosts hash before reconnect
  :connect_timeout => 0,              # Timeout for TCP/TLS connects, use # for max seconds
  :connect_headers => {},             # user supplied CONNECT headers (req'd for Stomp 1.1+)
  :parse_timeout => 5,                # IO::select wait time on socket reads
  :logger => nil,                     # user suplied callback logger instance
  :dmh => false,                      # do not support multihomed IPV4 / IPV6 hosts during failover
  :closed_check => true,              # check first if closed in each protocol method
  :hbser => false,                    # raise on heartbeat send exception
  :stompconn => false,                # Use STOMP instead of CONNECT
  :usecrlf => false,                  # Use CRLF command and header line ends (1.2+)
  :max_hbread_fails => 0,             # Max HB read fails before retry.  0 => never retry
  :max_hbrlck_fails => 0,             # Max HB read lock obtain fails before retry.  0 => never retry
  :fast_hbs_adjust => 0.0,            # Fast heartbeat senders sleep adjustment, seconds, needed ...
                                      # For fast heartbeat senders.  'fast' == YMMV.  If not
                                      # correct for your environment, expect unnecessary fail overs
  :connread_timeout => 0,             # Timeout during CONNECT for read of CONNECTED/ERROR, secs
  :tcp_nodelay => true,               # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm
  :start_timeout => 0,                # Timeout around Stomp::Client initialization
  :sslctx_newparm => nil,             # Param for SSLContext.new
  :ssl_post_conn_check => true,       # Further verify broker identity
  :nto_cmd_read => true,              # No timeout on COMMAND read
}

e.g. c = Stomp::Connection.new(hash)

Positional parameters:

login             (String,  default : '')
passcode          (String,  default : '')
host              (String,  default : 'localhost')
port              (Integer, default : 61613)
reliable          (Boolean, default : false)
reconnect_delay   (Integer, default : 5)

e.g. c = Stomp::Connection.new("username", "password", "localhost", 61613, true)

113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/stomp/connection.rb', line 113

def initialize( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  @protocol = Stomp::SPL_10 # Assumed at first
  @hb_received = true       # Assumed at first
  @hb_sent = true           # Assumed at first
  @hbs = @hbr = false       # Sending/Receiving heartbeats. Assume no for now.
  @jruby = false            # Assumed at first
  # Initialize some variables
  @closed, @socket, @hhas10, @rt, @st = true, nil, false, nil, nil
  if defined?(RUBY_ENGINE) && RUBY_ENGINE =~ /jruby/
    @jruby = true
  end

  ct = Thread.current
  if ct.respond_to?(:report_on_exception=)
    ct.report_on_exception=false
  end

  if .is_a?(Hash)
    hashed_initialize()
  else
    @host = host
    @port = port
    @login = 
    @passcode = passcode
    @reliable = reliable
    @reconnect_delay = reconnect_delay
    @connect_headers = connect_headers
    @ssl = false
    @parameters = nil
    @parse_timeout = 5		# To override, use hashed parameters
    @connect_timeout = 0	# To override, use hashed parameters
    @logger = Stomp::NullLogger.new	# To override, use hashed parameters
    @autoflush = false    # To override, use hashed parameters or setter
    @closed_check = true  # Run closed check in each protocol method
    @hbser = false        # Raise if heartbeat send exception
    @stompconn = false    # If true, use STOMP rather than CONNECT
    @usecrlf = false      # If true, use \r\n as line ends (1.2 only)
    @max_hbread_fails = 0 # 0 means never retry for HB read failures
    @max_hbrlck_fails = 0 # 0 means never retry for HB read lock failures
    @fast_hbs_adjust = 0.0 # Fast heartbeat senders sleep adjustment
    @connread_timeout = 0 # Connect read CONNECTED/ERROR timeout
    @tcp_nodelay = true # Disable Nagle
    @start_timeout = 0 # Client only, startup timeout
    @sslctx_newparm = nil # SSLContext.new paramater
    @ssl_post_conn_check = true # Additional broker verification
    @nto_cmd_read = true # No timeout on COMMAND read
    warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\//
  end

  # Use Mutexes:  only one lock per each thread.
  # Reverted to original implementation attempt using Mutex.
  @transmit_semaphore = Mutex.new
  @read_semaphore = Mutex.new
  @socket_semaphore = Mutex.new
  @gets_semaphore = Mutex.new

  @subscriptions = {}
  @failure = nil
  @connection_attempts = 0

  socket
end

Instance Attribute Details

#autoflushObject

Autoflush forces a flush on each transmit. This may be changed dynamically by calling code.


39
40
41
# File 'lib/stomp/connection.rb', line 39

def autoflush
  @autoflush
end

#connection_frameObject (readonly)

The CONNECTED frame from the broker.


17
18
19
# File 'lib/stomp/connection.rb', line 17

def connection_frame
  @connection_frame
end

#disconnect_receiptObject (readonly)

Any disconnect RECEIPT frame if requested.


20
21
22
# File 'lib/stomp/connection.rb', line 20

def disconnect_receipt
  @disconnect_receipt
end

#hb_receivedObject (readonly)

Heartbeat receive has been on time.


29
30
31
# File 'lib/stomp/connection.rb', line 29

def hb_received
  @hb_received
end

#hb_sentObject (readonly)

Heartbeat send has been successful.


32
33
34
# File 'lib/stomp/connection.rb', line 32

def hb_sent
  @hb_sent
end

#hostObject (readonly)

Currently-connected host and port


42
43
44
# File 'lib/stomp/connection.rb', line 42

def host
  @host
end

#jrubyObject (readonly)

JRuby detected


35
36
37
# File 'lib/stomp/connection.rb', line 35

def jruby
  @jruby
end

#portObject (readonly)

Currently-connected host and port


42
43
44
# File 'lib/stomp/connection.rb', line 42

def port
  @port
end

#protocolObject (readonly)

The Stomp Protocol version.


23
24
25
# File 'lib/stomp/connection.rb', line 23

def protocol
  @protocol
end

#sessionObject (readonly)

A unique session ID, assigned by the broker.


26
27
28
# File 'lib/stomp/connection.rb', line 26

def session
  @session
end

Class Method Details

.default_port(ssl) ⇒ Object

default_port returns the default port used by the gem for TCP or SSL.


45
46
47
# File 'lib/stomp/connection.rb', line 45

def self.default_port(ssl)
  ssl ? 61612 : 61613
end

.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Object

open is syntactic sugar for 'Connection.new', see 'initialize' for usage.


211
212
213
# File 'lib/stomp/connection.rb', line 211

def Connection.open( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  Connection.new(, passcode, host, port, reliable, reconnect_delay, connect_headers)
end

.ssl_v2xoptionsObject

SSL Helper


50
51
52
53
54
55
56
57
58
59
# File 'lib/stomp/connection.rb', line 50

def self.ssl_v2xoptions()
    require 'openssl' unless defined?(OpenSSL)
    # Mimic code in later versions of Ruby 2.x (and backported to later
    # versions of 1.9.3).
    opts = OpenSSL::SSL::OP_ALL
    opts &= ~OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS if defined?(OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS)
    opts |= OpenSSL::SSL::OP_NO_COMPRESSION if defined?(OpenSSL::SSL::OP_NO_COMPRESSION)
    opts |= OpenSSL::SSL::OP_NO_SSLv2 if defined?(OpenSSL::SSL::OP_NO_SSLv2)
    opts |= OpenSSL::SSL::OP_NO_SSLv3 if defined?(OpenSSL::SSL::OP_NO_SSLv3)
end

Instance Method Details

#_interruptible_gets(read_socket) ⇒ Object


12
13
14
15
16
17
18
19
20
21
# File 'lib/connection/netio.rb', line 12

def _interruptible_gets(read_socket)
  # The gets thread may be interrupted by the heartbeat thread. Ensure that
  # if so interrupted, a new gets cannot start until after the heartbeat
  # thread finishes its work. This is PURELY to avoid a segfault bug
  # involving OpenSSL::Buffer.
  @gets_semaphore.synchronize { @getst = Thread.current }
  read_socket.gets
ensure
  @gets_semaphore.synchronize { @getst = nil }
end

#abort(name, headers = {}) ⇒ Object

Abort aborts a transaction by name.


335
336
337
338
339
340
341
342
343
344
# File 'lib/stomp/connection.rb', line 335

def abort(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  slog(:on_abort, log_params, headers)
  transmit(Stomp::CMD_ABORT, headers)
end

#ack(message_or_ack_id, headers = {}) ⇒ Object

Acknowledge a message, used when a subscription has specified client acknowledgement e.g.:

connection.subscribe("/queue/a", :ack => 'client')

connection.subscribe("/queue/a", :ack => 'client-individual')

as appropriate for the protocol level.

Accepts an optional transaction header ( :transaction => 'some_transaction_id' ).

When the connection protocol level is 1.0 or 1.1 the message_or_ack_id parameter should match the message-id header of the MESSAGE being acknowledged e.g.:

connection.ack(message.headers['message-id'])

When the connection protocol level is 1.2 the message_or_ack_id parameter should match the ack header of the MESSAGE being acknowledged e.g.:

connection.ack(message.headers['ack'])

In summary, the behavior is protocol level dependent, see the specifications and comments in the code.


260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/stomp/connection.rb', line 260

def ack(message_or_ack_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  raise Stomp::Error::MessageIDRequiredError if message_or_ack_id.nil? || message_or_ack_id == ""
  headers = headers.symbolize_keys

  case @protocol
    when Stomp::SPL_12
      # The ACK frame MUST include an "id" header matching the "ack" header
      # of the MESSAGE being acknowledged.
      headers[:id] = message_or_ack_id
    when Stomp::SPL_11
      # ACK has two REQUIRED headers: "message-id", which MUST contain a value
      # matching the message-id header of the MESSAGE being acknowledged and
      # "subscription", which MUST be set to match the value of SUBSCRIBE's
      # id header.
      headers[:'message-id'] = message_or_ack_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
    else # Stomp::SPL_10
      # ACK has one required header, "message-id", which must contain a value
      # matching the message-id for the MESSAGE being acknowledged.
      headers[:'message-id'] = message_or_ack_id
  end
  _headerCheck(headers)
  slog(:on_ack, log_params, headers)
  transmit(Stomp::CMD_ACK, headers)
end

#begin(name, headers = {}) ⇒ Object

Begin starts a transaction, and requires a name for the transaction


226
227
228
229
230
231
232
233
234
235
# File 'lib/stomp/connection.rb', line 226

def begin(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  slog(:on_begin, log_params, headers)
  transmit(Stomp::CMD_BEGIN, headers)
end

#client_ack?(message) ⇒ Boolean

client_ack? determines if headers contain :ack => “client”.

Returns:

  • (Boolean)

453
454
455
456
# File 'lib/stomp/connection.rb', line 453

def client_ack?(message)
  headers = @subscriptions[message.headers[:destination]]
  !headers.nil? && headers[:ack] == "client"
end

#closed?Boolean

closed? tests if this connection is closed.

Returns:

  • (Boolean)

221
222
223
# File 'lib/stomp/connection.rb', line 221

def closed?
  @closed
end

#commit(name, headers = {}) ⇒ Object

Commit commits a transaction by name.


323
324
325
326
327
328
329
330
331
332
# File 'lib/stomp/connection.rb', line 323

def commit(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  slog(:on_commit, log_params, headers)
  transmit(Stomp::CMD_COMMIT, headers)
end

#disconnect(headers = {}) ⇒ Object

disconnect closes this connection. If requested, a disconnect RECEIPT will be received.


460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/stomp/connection.rb', line 460

def disconnect(headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  _headerCheck(headers)
  if @protocol >= Stomp::SPL_11
    @st.kill if @st # Kill ticker thread if any
    @rt.kill if @rt # Kill ticker thread if any
  end
  transmit(Stomp::CMD_DISCONNECT, headers)
  @disconnect_receipt = receive if headers[:receipt]
  slog(:on_disconnect, log_params)
  close_socket
end

#hashed_initialize(params) ⇒ Object

hashed_initialize prepares a new connection with a Hash of initialization parameters.


178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/stomp/connection.rb', line 178

def hashed_initialize(params)
  lp = _hdup(params)
  @parameters = refine_params(lp)
  @reliable =  @parameters[:reliable]
  @reconnect_delay = @parameters[:initial_reconnect_delay]
  @connect_headers = @parameters[:connect_headers]
  @parse_timeout =  @parameters[:parse_timeout]
  @connect_timeout =  @parameters[:connect_timeout]
  @logger = @parameters[:logger] || Stomp::NullLogger.new
  @autoflush = @parameters[:autoflush]
  @closed_check = @parameters[:closed_check]
  @hbser = @parameters[:hbser]
  @stompconn = @parameters[:stompconn]
  @usecrlf = @parameters[:usecrlf]
  @max_hbread_fails = @parameters[:max_hbread_fails]
  @max_hbrlck_fails = @parameters[:max_hbrlck_fails]
  @fast_hbs_adjust = @parameters[:fast_hbs_adjust]
  @connread_timeout = @parameters[:connread_timeout]
  @sslctx_newparm = @parameters[:sslctx_newparm]
  @ssl_post_conn_check = @parameters[:ssl_post_conn_check]
  @nto_cmd_read = @parameters[:nto_cmd_read]
  #
  # Try to support Ruby 1.9.x and 2.x ssl.
  unless defined?(RSpec)
    @parameters[:hosts].each do |ah|
      ah[:ssl] = Stomp::SSLParams.new if ah[:ssl] == true
    end
  end
  #sets the first host to connect
  change_host
end

#hbrecv_countObject

hbrecv_count returns the current connection's heartbeat receive count.


573
574
575
576
# File 'lib/stomp/connection.rb', line 573

def hbrecv_count()
  return 0 unless @hbrecv_count
  @hbrecv_count
end

#hbrecv_intervalObject

hbrecv_interval returns the connection's heartbeat receive interval.


561
562
563
564
# File 'lib/stomp/connection.rb', line 561

def hbrecv_interval()
  return 0 unless @hbrecv_interval
  @hbrecv_interval / 1000.0 # ms
end

#hbsend_countObject

hbsend_count returns the current connection's heartbeat send count.


567
568
569
570
# File 'lib/stomp/connection.rb', line 567

def hbsend_count()
  return 0 unless @hbsend_count
  @hbsend_count
end

#hbsend_intervalObject

hbsend_interval returns the connection's heartbeat send interval.


555
556
557
558
# File 'lib/stomp/connection.rb', line 555

def hbsend_interval()
  return 0 unless @hbsend_interval
  @hbsend_interval / 1000.0 # ms
end

#nack(message_or_ack_id, headers = {}) ⇒ Object

STOMP 1.1+ NACK.

When the connection protocol level is 1.1 the message_or_ack_id parameter should match the message-id header of the MESSAGE being acknowledged.

When the connection protocol level is 1.2 the message_or_ack_id parameter should match the ack header of the MESSAGE being acknowledged.

Behavior is protocol level dependent, see the specifications and comments below.


298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/stomp/connection.rb', line 298

def nack(message_or_ack_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::UnsupportedProtocolError if @protocol == Stomp::SPL_10
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::MessageIDRequiredError if message_or_ack_id.nil? || message_or_ack_id == ""
  headers = headers.symbolize_keys
  case @protocol
    when Stomp::SPL_12
      # The NACK frame MUST include an id header matching the ack header
      # of the MESSAGE being acknowledged.
      headers[:id] = message_or_ack_id
    else # Stomp::SPL_11 only
      # NACK has two REQUIRED headers: message-id, which MUST contain a value
      # matching the message-id for the MESSAGE being acknowledged and
      # subscription, which MUST be set to match the value of the subscription's
      # id header.
      headers[:'message-id'] = message_or_ack_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
  end
  _headerCheck(headers)
  slog(:on_nack, log_params, headers)
  transmit(Stomp::CMD_NACK, headers)
end

#open?Boolean

open? tests if this connection is open.

Returns:

  • (Boolean)

216
217
218
# File 'lib/stomp/connection.rb', line 216

def open?
  !@closed
end

#pollObject

poll returns a pending message if one is available, otherwise returns nil.


478
479
480
481
482
483
484
# File 'lib/stomp/connection.rb', line 478

def poll()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  # No need for a read lock here.  The receive method eventually fulfills
  # that requirement.
  return nil if @socket.nil? || !@socket.ready?
  receive()
end

#publish(destination, message, headers = {}) ⇒ Object

Publish message to destination. To disable content length header use header ( :suppress_content_length => true ). Accepts a transaction header ( :transaction => 'some_transaction_id' ).


399
400
401
402
403
404
405
406
407
408
409
# File 'lib/stomp/connection.rb', line 399

def publish(destination, message, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  headers[:destination] = destination
  _headerCheck(headers)
  slog(:on_publish, log_params, message, headers)
  transmit(Stomp::CMD_SEND, headers, message)
end

#receiveObject

receive returns the next Message off of the wire. this can return nil in cases where:

  • the broker has closed the connection

  • the connection is not reliable


490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
# File 'lib/stomp/connection.rb', line 490

def receive()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  super_result = __old_receive()
  if super_result.nil? && @reliable && !closed?
    errstr = "connection.receive returning EOF as nil - resetting connection.\n"
    unless slog(:on_miscerr, log_params, "es_recv: " + errstr)
      $stderr.print errstr
    end

    # !!! This initiates a re-connect !!!
    # The call to __old_receive() will in turn call socket().  Before
    # that we should change the target host, otherwise the host that
    # just failed may be attempted first.
    _reconn_prep()
    #
    super_result = __old_receive()
  end
  #
  if super_result.nil? && !@reliable
    @st.kill if @st # Kill ticker thread if any
    @rt.kill if @rt # Kill ticker thread if any
    close_socket()
    @closed = true
    warn 'warning: broker sent EOF, and connection not reliable' unless defined?(Test)
  end
  slog(:on_receive, log_params, super_result)
  return super_result
end

#set_logger(logger) ⇒ Object

set_logger selects a new callback logger instance.


520
521
522
# File 'lib/stomp/connection.rb', line 520

def set_logger(logger)
  @logger = logger
end

#sha1(data) ⇒ Object

sha1 returns a SHA1 digest for arbitrary string data.


536
537
538
# File 'lib/stomp/connection.rb', line 536

def sha1(data)
  Digest::SHA1.hexdigest(data)
end

#slog(name, *parms) ⇒ Object

log call router


579
580
581
582
583
# File 'lib/stomp/connection.rb', line 579

def slog(name, *parms)
  return false unless @logger
  @logger.send(name, *parms) if @logger.respond_to?(:"#{name}")
  @logger.respond_to?(:"#{name}")
end

#subscribe(destination, headers = {}, subId = nil) ⇒ Object

Subscribe subscribes to a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.


348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/stomp/connection.rb', line 348

def subscribe(destination, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  headers[:destination] = destination
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
    headers[:id] = subId if headers[:id].nil?
  end
  _headerCheck(headers)
  slog(:on_subscribe, log_params, headers)

  ## p [ "subId", subId ]
  ## p [ "subscriptions", @subscriptions ]
  # Store the subscription so that we can replay if we reconnect.
  if @reliable
    subId = destination if subId.nil?
    raise Stomp::Error::DuplicateSubscription if @subscriptions[subId]
    @subscriptions[subId] = headers
  end

  transmit(Stomp::CMD_SUBSCRIBE, headers)
end

#unreceive(message, options = {}) ⇒ Object

Send a message back to the source or to the dead letter queue. Accepts a dead letter queue option ( :dead_letter_queue => “/queue/DLQ” ). Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ). Accepts a force client acknowledgement option (:force_client_ack => true).


415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# File 'lib/stomp/connection.rb', line 415

def unreceive(message, options = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge(options)
  # Lets make sure all keys are symbols
  message.headers = message.headers.symbolize_keys
  retry_count = message.headers[:retry_count].to_i || 0
  message.headers[:retry_count] = retry_count + 1
  transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
  message_id = message.headers.delete(:'message-id')

  # Prevent duplicate 'subscription' headers on subsequent receives
  message.headers.delete(:subscription) if message.headers[:subscription]

  begin
    self.begin transaction_id

    if client_ack?(message) || options[:force_client_ack]
      self.ack(message_id, :transaction => transaction_id)
    end

    if message.headers[:retry_count] <= options[:max_redeliveries]
      self.publish(message.headers[:destination], message.body,
        message.headers.merge(:transaction => transaction_id))
    else
      # Poison ack, sending the message to the DLQ
      self.publish(options[:dead_letter_queue], message.body,
        message.headers.merge(:transaction => transaction_id,
        :original_destination => message.headers[:destination],
        :persistent => true))
    end
    self.commit transaction_id
  rescue Exception => exception
    self.abort transaction_id
    raise exception
  end
end

#unsubscribe(destination, headers = {}, subId = nil) ⇒ Object

Unsubscribe from a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.


376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/stomp/connection.rb', line 376

def unsubscribe(destination, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  headers[:destination] = destination
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
    headers[:id] = subId unless headers[:id]
  end
  _headerCheck(headers)
  slog(:on_unsubscribe, log_params, headers)
  transmit(Stomp::CMD_UNSUBSCRIBE, headers)
  if @reliable
    subId = destination if subId.nil?
    @subscriptions.delete(subId)
  end
end

#uuidObject

uuid returns a type 4 UUID.


541
542
543
544
545
546
547
548
549
550
551
552
# File 'lib/stomp/connection.rb', line 541

def uuid()
  b = []
  0.upto(15) do |i|
    b << rand(255)
  end
  b[6] = (b[6] & 0x0F) | 0x40
  b[8] = (b[8] & 0xbf) | 0x80
  #             0  1  2  3   4   5  6  7   8  9  10 11 12 13 14 15
  rs = sprintf("%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x%02x%02x",
  b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15])
  rs
end

#valid_utf8?(s) ⇒ Boolean

valid_utf8? returns an indicator if the given string is a valid UTF8 string.

Returns:

  • (Boolean)

525
526
527
528
529
530
531
532
533
# File 'lib/stomp/connection.rb', line 525

def valid_utf8?(s)
  case RUBY_VERSION
  when /1\.8/
    rv = _valid_utf8?(s)
  else
    rv = s.encoding.name != Stomp::UTF8 ? false : s.valid_encoding?
  end
  rv
end