Class: Stomp::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp/connection.rb,
lib/connection/utf8.rb,
lib/connection/utils.rb,
lib/connection/netio.rb,
lib/connection/heartbeats.rb

Overview

Low level connection which maps commands and supports synchronous receives

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Connection

A new Connection object can be initialized using two forms:

Hash (this is the recommended Connection initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  :reliable => true,
  :initial_reconnect_delay => 0.01,
  :max_reconnect_delay => 30.0,
  :use_exponential_back_off => true,
  :back_off_multiplier => 2,
  :max_reconnect_attempts => 0,
  :randomize => false,
  :connect_timeout => 0,
  :connect_headers => {},
  :parse_timeout => 5,
  :logger => nil,
  :dmh => false,
  :closed_check => true,
  :hbser => false,
  :stompconn => false,
  :usecrlf => false,
  :max_hbread_fails => 0,
  :max_hbrlck_fails => 0,
  :fast_hbs_adjust => 0.0,
  :connread_timeout => 0,
  :tcp_nodelay => true,
  :start_timeout => 0,
  :sslctx_newparm => nil,
}

e.g. c = Stomp::Connection.new(hash)

Positional parameters:

login             (String,  default : '')
passcode          (String,  default : '')
host              (String,  default : 'localhost')
port              (Integer, default : 61613)
reliable          (Boolean, default : false)
reconnect_delay   (Integer, default : 5)

e.g. c = Stomp::Connection.new("username", "password", "localhost", 61613, true)


108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/stomp/connection.rb', line 108

def initialize( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  @protocol = Stomp::SPL_10 # Assumed at first
  @hb_received = true       # Assumed at first
  @hb_sent = true           # Assumed at first
  @hbs = @hbr = false       # Sending/Receiving heartbeats. Assume no for now.
  @jruby = false            # Assumed at first
  if defined?(RUBY_ENGINE) && RUBY_ENGINE =~ /jruby/
    @jruby = true
  end
  if .is_a?(Hash)
    hashed_initialize()
  else
    @host = host
    @port = port
    @login = 
    @passcode = passcode
    @reliable = reliable
    @reconnect_delay = reconnect_delay
    @connect_headers = connect_headers
    @ssl = false
    @parameters = nil
    @parse_timeout = 5		# To override, use hashed parameters
    @connect_timeout = 0	# To override, use hashed parameters
    @logger = Stomp::NullLogger.new	# To override, use hashed parameters
    @autoflush = false    # To override, use hashed parameters or setter
    @closed_check = true  # Run closed check in each protocol method
    @hbser = false        # Raise if heartbeat send exception
    @stompconn = false    # If true, use STOMP rather than CONNECT
    @usecrlf = false      # If true, use \r\n as line ends (1.2 only)
    @max_hbread_fails = 0 # 0 means never retry for HB read failures
    @max_hbrlck_fails = 0 # 0 means never retry for HB read lock failures
    @fast_hbs_adjust = 0.0 # Fast heartbeat senders sleep adjustment
    @connread_timeout = 0 # Connect read CONNECTED/ERROR timeout
    @tcp_nodelay = true # Disable Nagle
    @start_timeout = 0 # Client only, startup timeout
    @sslctx_newparm = nil # SSLContext.new paramater
    warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\//
  end

  # Use Mutexes:  only one lock per each thread.
  # Reverted to original implementation attempt using Mutex.
  @transmit_semaphore = Mutex.new
  @read_semaphore = Mutex.new
  @socket_semaphore = Mutex.new

  @subscriptions = {}
  @failure = nil
  @connection_attempts = 0

  socket
end

Instance Attribute Details

#autoflushObject

Autoflush forces a flush on each transmit. This may be changed dynamically by calling code.



39
40
41
# File 'lib/stomp/connection.rb', line 39

def autoflush
  @autoflush
end

#connection_frameObject (readonly)

The CONNECTED frame from the broker.



17
18
19
# File 'lib/stomp/connection.rb', line 17

def connection_frame
  @connection_frame
end

#disconnect_receiptObject (readonly)

Any disconnect RECEIPT frame if requested.



20
21
22
# File 'lib/stomp/connection.rb', line 20

def disconnect_receipt
  @disconnect_receipt
end

#hb_receivedObject (readonly)

Heartbeat receive has been on time.



29
30
31
# File 'lib/stomp/connection.rb', line 29

def hb_received
  @hb_received
end

#hb_sentObject (readonly)

Heartbeat send has been successful.



32
33
34
# File 'lib/stomp/connection.rb', line 32

def hb_sent
  @hb_sent
end

#hostObject (readonly)

Currently-connected host and port



42
43
44
# File 'lib/stomp/connection.rb', line 42

def host
  @host
end

#jrubyObject (readonly)

JRuby detected



35
36
37
# File 'lib/stomp/connection.rb', line 35

def jruby
  @jruby
end

#portObject (readonly)

Currently-connected host and port



42
43
44
# File 'lib/stomp/connection.rb', line 42

def port
  @port
end

#protocolObject (readonly)

The Stomp Protocol version.



23
24
25
# File 'lib/stomp/connection.rb', line 23

def protocol
  @protocol
end

#sessionObject (readonly)

A unique session ID, assigned by the broker.



26
27
28
# File 'lib/stomp/connection.rb', line 26

def session
  @session
end

Class Method Details

.default_port(ssl) ⇒ Object

default_port returns the default port used by the gem for TCP or SSL.



45
46
47
# File 'lib/stomp/connection.rb', line 45

def self.default_port(ssl)
  ssl ? 61612 : 61613
end

.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Object

open is syntactic sugar for 'Connection.new', see 'initialize' for usage.



193
194
195
# File 'lib/stomp/connection.rb', line 193

def Connection.open( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  Connection.new(, passcode, host, port, reliable, reconnect_delay, connect_headers)
end

.ssl_v2xoptionsObject

SSL Helper



50
51
52
53
54
55
56
57
58
59
# File 'lib/stomp/connection.rb', line 50

def self.ssl_v2xoptions()
    require 'openssl' unless defined?(OpenSSL)
    # Mimic code in later versions of Ruby 2.x (and backported to later
    # versions of 1.9.3).
    opts = OpenSSL::SSL::OP_ALL
    opts &= ~OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS if defined?(OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS)
    opts |= OpenSSL::SSL::OP_NO_COMPRESSION if defined?(OpenSSL::SSL::OP_NO_COMPRESSION)
    opts |= OpenSSL::SSL::OP_NO_SSLv2 if defined?(OpenSSL::SSL::OP_NO_SSLv2)
    opts |= OpenSSL::SSL::OP_NO_SSLv3 if defined?(OpenSSL::SSL::OP_NO_SSLv3)
end

Instance Method Details

#abort(name, headers = {}) ⇒ Object

Abort aborts a transaction by name.



290
291
292
293
294
295
296
297
298
299
# File 'lib/stomp/connection.rb', line 290

def abort(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  slog(:on_abort, log_params, headers)
  transmit(Stomp::CMD_ABORT, headers)
end

#ack(message_id, headers = {}) ⇒ Object

Acknowledge a message, used when a subscription has specified client acknowledgement i.e. connection.subscribe(“/queue/a”, :ack => 'client'). Accepts an optional transaction header ( :transaction => 'some_transaction_id' ) Behavior is protocol level dependent, see the specifications or comments below.



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/stomp/connection.rb', line 223

def ack(message_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
  headers = headers.symbolize_keys

  case @protocol
    when Stomp::SPL_12
      # The ACK frame MUST include an id header matching the ack header 
      # of the MESSAGE being acknowledged.
      headers[:id] = message_id
    when Stomp::SPL_11
      # ACK has two REQUIRED headers: message-id, which MUST contain a value 
      # matching the message-id for the MESSAGE being acknowledged and 
      # subscription, which MUST be set to match the value of the subscription's 
      # id header.
      headers[:'message-id'] = message_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
    else # Stomp::SPL_10
      # ACK has one required header, message-id, which must contain a value 
      # matching the message-id for the MESSAGE being acknowledged.
      headers[:'message-id'] = message_id
  end
  _headerCheck(headers)
  slog(:on_ack, log_params, headers)
  transmit(Stomp::CMD_ACK, headers)
end

#begin(name, headers = {}) ⇒ Object

Begin starts a transaction, and requires a name for the transaction



208
209
210
211
212
213
214
215
216
217
# File 'lib/stomp/connection.rb', line 208

def begin(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  slog(:on_begin, log_params, headers)
  transmit(Stomp::CMD_BEGIN, headers)
end

#client_ack?(message) ⇒ Boolean

client_ack? determines if headers contain :ack => “client”.

Returns:

  • (Boolean)


403
404
405
406
# File 'lib/stomp/connection.rb', line 403

def client_ack?(message)
  headers = @subscriptions[message.headers[:destination]]
  !headers.nil? && headers[:ack] == "client"
end

#closed?Boolean

closed? tests if this connection is closed.

Returns:

  • (Boolean)


203
204
205
# File 'lib/stomp/connection.rb', line 203

def closed?
  @closed
end

#commit(name, headers = {}) ⇒ Object

Commit commits a transaction by name.



278
279
280
281
282
283
284
285
286
287
# File 'lib/stomp/connection.rb', line 278

def commit(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  slog(:on_commit, log_params, headers)
  transmit(Stomp::CMD_COMMIT, headers)
end

#disconnect(headers = {}) ⇒ Object

disconnect closes this connection. If requested, a disconnect RECEIPT will be received.



410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
# File 'lib/stomp/connection.rb', line 410

def disconnect(headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  _headerCheck(headers)
  if @protocol >= Stomp::SPL_11
    @st.kill if @st # Kill ticker thread if any
    @rt.kill if @rt # Kill ticker thread if any
  end
  transmit(Stomp::CMD_DISCONNECT, headers)
  @disconnect_receipt = receive if headers[:receipt]
  slog(:on_disconnect, log_params)
  close_socket
end

#hashed_initialize(params) ⇒ Object

hashed_initialize prepares a new connection with a Hash of initialization parameters.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/stomp/connection.rb', line 162

def hashed_initialize(params)
  lp = _hdup(params)
  @parameters = refine_params(lp)
  @reliable =  @parameters[:reliable]
  @reconnect_delay = @parameters[:initial_reconnect_delay]
  @connect_headers = @parameters[:connect_headers]
  @parse_timeout =  @parameters[:parse_timeout]
  @connect_timeout =  @parameters[:connect_timeout]
  @logger = @parameters[:logger] || Stomp::NullLogger.new
  @autoflush = @parameters[:autoflush]
  @closed_check = @parameters[:closed_check]
  @hbser = @parameters[:hbser]
  @stompconn = @parameters[:stompconn]
  @usecrlf = @parameters[:usecrlf]
  @max_hbread_fails = @parameters[:max_hbread_fails]
  @max_hbrlck_fails = @parameters[:max_hbrlck_fails]
  @fast_hbs_adjust = @parameters[:fast_hbs_adjust]
  @connread_timeout = @parameters[:connread_timeout]
  @sslctx_newparm = @parameters[:sslctx_newparm]
  #
  # Try to support Ruby 1.9.x and 2.x ssl.
  unless defined?(RSpec)
    @parameters[:hosts].each do |ah|
      ah[:ssl] = Stomp::SSLParams.new if ah[:ssl] == true
    end
  end
  #sets the first host to connect
  change_host
end

#hbrecv_countObject

hbrecv_count returns the current connection's heartbeat receive count.



522
523
524
525
# File 'lib/stomp/connection.rb', line 522

def hbrecv_count()
  return 0 unless @hbrecv_count
  @hbrecv_count
end

#hbrecv_intervalObject

hbrecv_interval returns the connection's heartbeat receive interval.



510
511
512
513
# File 'lib/stomp/connection.rb', line 510

def hbrecv_interval()
  return 0 unless @hbrecv_interval
  @hbrecv_interval / 1000.0 # ms
end

#hbsend_countObject

hbsend_count returns the current connection's heartbeat send count.



516
517
518
519
# File 'lib/stomp/connection.rb', line 516

def hbsend_count()
  return 0 unless @hbsend_count
  @hbsend_count
end

#hbsend_intervalObject

hbsend_interval returns the connection's heartbeat send interval.



504
505
506
507
# File 'lib/stomp/connection.rb', line 504

def hbsend_interval()
  return 0 unless @hbsend_interval
  @hbsend_interval / 1000.0 # ms
end

#nack(message_id, headers = {}) ⇒ Object

STOMP 1.1+ NACK.



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/stomp/connection.rb', line 253

def nack(message_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::UnsupportedProtocolError if @protocol == Stomp::SPL_10
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
  headers = headers.symbolize_keys
  case @protocol
    when Stomp::SPL_12
      # The NACK frame MUST include an id header matching the ack header 
      # of the MESSAGE being acknowledged.
      headers[:id] = message_id
    else # Stomp::SPL_11 only
      # NACK has two REQUIRED headers: message-id, which MUST contain a value 
      # matching the message-id for the MESSAGE being acknowledged and 
      # subscription, which MUST be set to match the value of the subscription's 
      # id header.
      headers[:'message-id'] = message_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
  end
  _headerCheck(headers)
  slog(:on_nack, log_params, headers)
  transmit(Stomp::CMD_NACK, headers)
end

#open?Boolean

open? tests if this connection is open.

Returns:

  • (Boolean)


198
199
200
# File 'lib/stomp/connection.rb', line 198

def open?
  !@closed
end

#pollObject

poll returns a pending message if one is available, otherwise returns nil.



428
429
430
431
432
433
434
# File 'lib/stomp/connection.rb', line 428

def poll()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  # No need for a read lock here.  The receive method eventually fulfills
  # that requirement.
  return nil if @socket.nil? || !@socket.ready?
  receive()
end

#publish(destination, message, headers = {}) ⇒ Object

Publish message to destination. To disable content length header use header ( :suppress_content_length => true ). Accepts a transaction header ( :transaction => 'some_transaction_id' ).



350
351
352
353
354
355
356
357
358
359
# File 'lib/stomp/connection.rb', line 350

def publish(destination, message, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:destination] = destination
  _headerCheck(headers)
  slog(:on_publish, log_params, message, headers)
  transmit(Stomp::CMD_SEND, headers, message)
end

#receiveObject

receive returns the next Message off of the wire. this can return nil in cases where:

  • the broker has closed the connection

  • the connection is not reliable



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# File 'lib/stomp/connection.rb', line 440

def receive()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  super_result = __old_receive()
  if super_result.nil? && @reliable && !closed?
    errstr = "connection.receive returning EOF as nil - resetting connection.\n"
    slog(:on_miscerr, log_params, "es_recv: " + errstr)
    $stderr.print errstr

    # !!! This initiates a re-connect !!!
    # The call to __old_receive() will in turn call socket().  Before
    # that we should change the target host, otherwise the host that
    # just failed may be attempted first.
    _reconn_prep()
    #
    super_result = __old_receive()
  end
  #
  if super_result.nil? && !@reliable
    @st.kill if @st # Kill ticker thread if any
    @rt.kill if @rt # Kill ticker thread if any
    close_socket()
    @closed = true
    warn 'warning: broker sent EOF, and connection not reliable' unless defined?(Test)
  end
  slog(:on_receive, log_params, super_result)
  return super_result
end

#set_logger(logger) ⇒ Object

set_logger selects a new callback logger instance.



469
470
471
# File 'lib/stomp/connection.rb', line 469

def set_logger(logger)
  @logger = logger
end

#sha1(data) ⇒ Object

sha1 returns a SHA1 digest for arbitrary string data.



485
486
487
# File 'lib/stomp/connection.rb', line 485

def sha1(data)
  Digest::SHA1.hexdigest(data)
end

#slog(name, *parms) ⇒ Object

log call router



528
529
530
531
532
# File 'lib/stomp/connection.rb', line 528

def slog(name, *parms)
  return false unless @logger
  @logger.send(name, *parms) if @logger.respond_to?(:"#{name}")
  @logger.respond_to?(:"#{name}")
end

#subscribe(name, headers = {}, subId = nil) ⇒ Object

Subscribe subscribes to a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.



303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/stomp/connection.rb', line 303

def subscribe(name, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:destination] = name
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
    headers[:id] = subId if headers[:id].nil?
  end
  _headerCheck(headers)
  slog(:on_subscribe, log_params, headers)

  # Store the subscription so that we can replay if we reconnect.
  if @reliable
    subId = name if subId.nil?
    raise Stomp::Error::DuplicateSubscription if @subscriptions[subId]
    @subscriptions[subId] = headers
  end

  transmit(Stomp::CMD_SUBSCRIBE, headers)
end

#unreceive(message, options = {}) ⇒ Object

Send a message back to the source or to the dead letter queue. Accepts a dead letter queue option ( :dead_letter_queue => “/queue/DLQ” ). Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ). Accepts a force client acknowledgement option (:force_client_ack => true).



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/stomp/connection.rb', line 365

def unreceive(message, options = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge(options)
  # Lets make sure all keys are symbols
  message.headers = message.headers.symbolize_keys
  retry_count = message.headers[:retry_count].to_i || 0
  message.headers[:retry_count] = retry_count + 1
  transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
  message_id = message.headers.delete(:'message-id')

  # Prevent duplicate 'subscription' headers on subsequent receives
  message.headers.delete(:subscription) if message.headers[:subscription]

  begin
    self.begin transaction_id

    if client_ack?(message) || options[:force_client_ack]
      self.ack(message_id, :transaction => transaction_id)
    end

    if message.headers[:retry_count] <= options[:max_redeliveries]
      self.publish(message.headers[:destination], message.body, 
        message.headers.merge(:transaction => transaction_id))
    else
      # Poison ack, sending the message to the DLQ
      self.publish(options[:dead_letter_queue], message.body, 
        message.headers.merge(:transaction => transaction_id, 
        :original_destination => message.headers[:destination], 
        :persistent => true))
    end
    self.commit transaction_id
  rescue Exception => exception
    self.abort transaction_id
    raise exception
  end
end

#unsubscribe(dest, headers = {}, subId = nil) ⇒ Object

Unsubscribe from a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.



328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/stomp/connection.rb', line 328

def unsubscribe(dest, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("")
  raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("")
  headers = headers.symbolize_keys
  headers[:destination] = dest
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
    headers[:id] = subId unless headers[:id]
  end
  _headerCheck(headers)
  slog(:on_unsubscribe, log_params, headers)
  transmit(Stomp::CMD_UNSUBSCRIBE, headers)
  if @reliable
    subId = dest if subId.nil?
    @subscriptions.delete(subId)
  end
end

#uuidObject

uuid returns a type 4 UUID.



490
491
492
493
494
495
496
497
498
499
500
501
# File 'lib/stomp/connection.rb', line 490

def uuid()
  b = []
  0.upto(15) do |i|
    b << rand(255)
  end
  b[6] = (b[6] & 0x0F) | 0x40
  b[8] = (b[8] & 0xbf) | 0x80
  #             0  1  2  3   4   5  6  7   8  9  10 11 12 13 14 15
  rs = sprintf("%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x%02x%02x",
  b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15])
  rs
end

#valid_utf8?(s) ⇒ Boolean

valid_utf8? returns an indicator if the given string is a valid UTF8 string.

Returns:

  • (Boolean)


474
475
476
477
478
479
480
481
482
# File 'lib/stomp/connection.rb', line 474

def valid_utf8?(s)
  case RUBY_VERSION
  when /1\.8/
    rv = _valid_utf8?(s)
  else
    rv = s.encoding.name != Stomp::UTF8 ? false : s.valid_encoding?
  end
  rv
end