Class: Stomp::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/stomp/client.rb,
lib/client/utils.rb

Overview

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) ⇒ Client

A new Client object can be initialized using three forms:

Hash (this is the recommended Client initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  # These are the default parameters and do not need to be set
  :reliable => true,                  # reliable (use failover)
  :initial_reconnect_delay => 0.01,   # initial delay before reconnect (secs)
  :max_reconnect_delay => 30.0,       # max delay before reconnect
  :use_exponential_back_off => true,  # increase delay between reconnect attpempts
  :back_off_multiplier => 2,          # next delay multiplier
  :max_reconnect_attempts => 0,       # retry forever, use # for maximum attempts
  :randomize => false,                # do not radomize hosts hash before reconnect
  :connect_timeout => 0,              # Timeout for TCP/TLS connects, use # for max seconds
  :connect_headers => {},             # user supplied CONNECT headers (req'd for Stomp 1.1+)
  :parse_timeout => 5,                # IO::select wait time on socket reads
  :logger => nil,                     # user suplied callback logger instance
  :dmh => false,                      # do not support multihomed IPV4 / IPV6 hosts during failover
  :closed_check => true,              # check first if closed in each protocol method
  :hbser => false,                    # raise on heartbeat send exception
  :stompconn => false,                # Use STOMP instead of CONNECT
  :usecrlf => false,                  # Use CRLF command and header line ends (1.2+)
  :max_hbread_fails => 0,             # Max HB read fails before retry.  0 => never retry
  :max_hbrlck_fails => 0,             # Max HB read lock obtain fails before retry.  0 => never retry
  :fast_hbs_adjust => 0.0,            # Fast heartbeat senders sleep adjustment, seconds, needed ...
                                      # For fast heartbeat senders.  'fast' == YMMV.  If not
                                      # correct for your environment, expect unnecessary fail overs
  :connread_timeout => 0,             # Timeout during CONNECT for read of CONNECTED/ERROR, secs
  :tcp_nodelay => true,               # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm
  :start_timeout => 0,                # Timeout around Stomp::Client initialization
  :sslctx_newparm => nil,             # Param for SSLContext.new
  :ssl_post_conn_check => true,       # Further verify broker identity
}

e.g. c = Stomp::Client.new(hash)

Positional parameters:

     (String,  default : '')
passcode  (String,  default : '')
host      (String,  default : 'localhost')
port      (Integer, default : 61613)
reliable  (Boolean, default : false)

e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://login:passcode@host:port
stomp://login:passcode@host.domain.tld:port

e.g. c = Stomp::Client.new(urlstring)


83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/stomp/client.rb', line 83

def initialize( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false)
  parse_hash_params() ||
    parse_stomp_url() ||
    parse_failover_url() ||
    parse_positional_params(, passcode, host, port, reliable)

  @logger = @parameters[:logger] ||= Stomp::NullLogger.new
  @start_timeout = @parameters[:start_timeout] || 0
  check_arguments!()

  # p [ "cldbg01", @parameters ]

  begin
    Timeout::timeout(@start_timeout) {
      create_error_handler
      create_connection(autoflush)
      start_listeners()
    }
  rescue TimeoutError
    # p [ "cldbg02" ]
    ex = Stomp::Error::StartTimeoutException.new(@start_timeout)
    raise ex
  end
end

Instance Attribute Details

#parametersObject (readonly)

Parameters hash



19
20
21
# File 'lib/stomp/client.rb', line 19

def parameters
  @parameters
end

Class Method Details

.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) ⇒ Object

open is syntactic sugar for ‘Client.new’, see ‘initialize’ for usage.



133
134
135
# File 'lib/stomp/client.rb', line 133

def self.open( = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
  Client.new(, passcode, host, port, reliable)
end

Instance Method Details

#abort(name, headers = {}) ⇒ Object

Abort aborts work in a transaction by name.



149
150
151
152
153
154
155
156
157
158
159
# File 'lib/stomp/client.rb', line 149

def abort(name, headers = {})
  @connection.abort(name, headers)

  # replay any ack'd messages in this transaction
  replay_list = @replay_messages_by_txn[name]
  if replay_list
    replay_list.each do |message|
      find_listener(message) # find_listener also calls the listener
    end
  end
end

#ack(message, headers = {}) ⇒ Object Also known as: acknowledge

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,=> ‘client’). Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/stomp/client.rb', line 194

def ack(message, headers = {})
  txn_id = headers[:transaction]
  if txn_id
    # lets keep around messages ack'd in this transaction in case we rollback
    replay_list = @replay_messages_by_txn[txn_id]
    if replay_list.nil?
      replay_list = []
      @replay_messages_by_txn[txn_id] = replay_list
    end
    replay_list << message
  end
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  context = ack_context_for(message, headers)
  @connection.ack context[:message_id], context[:headers]
end

#ack_context_for(message, headers) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/stomp/client.rb', line 222

def ack_context_for(message, headers)
  id = case protocol
    when Stomp::SPL_12
     'ack'
    when Stomp::SPL_11
     headers.merge!(:subscription => message.headers['subscription'])
     'message-id'
    else
     'message-id'
  end
  {:message_id => message.headers[id], :headers => headers}
end

#autoflushObject

autoflush returns the current connection’s autoflush setting.



346
347
348
# File 'lib/stomp/client.rb', line 346

def autoflush()
  @connection.autoflush()
end

#autoflush=(af) ⇒ Object

autoflush= sets the current connection’s autoflush setting.



341
342
343
# File 'lib/stomp/client.rb', line 341

def autoflush=(af)
  @connection.autoflush = af
end

#begin(name, headers = {}) ⇒ Object

Begin starts work in a a transaction by name.



144
145
146
# File 'lib/stomp/client.rb', line 144

def begin(name, headers = {})
  @connection.begin(name, headers)
end

#close(headers = {}) ⇒ Object

close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.



278
279
280
281
# File 'lib/stomp/client.rb', line 278

def close(headers={})
  @listener_thread.exit
  @connection.disconnect(headers)
end

#closed?Boolean

close? tests if this client connection is closed.

Returns:



267
268
269
# File 'lib/stomp/client.rb', line 267

def closed?()
  @connection.closed?()
end

#commit(name, headers = {}) ⇒ Object

Commit commits work in a transaction by name.



162
163
164
165
166
# File 'lib/stomp/client.rb', line 162

def commit(name, headers = {})
  txn_id = headers[:transaction]
  @replay_messages_by_txn.delete(txn_id)
  @connection.commit(name, headers)
end

#connection_frameObject

Return the broker’s CONNECTED frame to the client. Misnamed.



252
253
254
# File 'lib/stomp/client.rb', line 252

def connection_frame()
  @connection.connection_frame
end

#create_error_handlerObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/stomp/client.rb', line 108

def create_error_handler
  client_thread = Thread.current

  @error_listener = lambda do |error|
    exception = case error.body
                  when /ResourceAllocationException/i
                    Stomp::Error::ProducerFlowControlException.new(error)
                  when /ProtocolException/i
                    Stomp::Error::ProtocolException.new(error)
                  else
                    Stomp::Error::BrokerException.new(error)
                end

    client_thread.raise exception
  end
end

#disconnect_receiptObject

Return any RECEIPT frame received by DISCONNECT.



257
258
259
# File 'lib/stomp/client.rb', line 257

def disconnect_receipt()
  @connection.disconnect_receipt
end

#hbrecv_countObject

hbrecv_count returns the current connection’s heartbeat receive count.



330
331
332
# File 'lib/stomp/client.rb', line 330

def hbrecv_count()
  @connection.hbrecv_count()
end

#hbrecv_intervalObject

hbrecv_interval returns the connection’s heartbeat receive interval.



320
321
322
# File 'lib/stomp/client.rb', line 320

def hbrecv_interval()
  @connection.hbrecv_interval()
end

#hbsend_countObject

hbsend_count returns the current connection’s heartbeat send count.



325
326
327
# File 'lib/stomp/client.rb', line 325

def hbsend_count()
  @connection.hbsend_count()
end

#hbsend_intervalObject

hbsend_interval returns the connection’s heartbeat send interval.



315
316
317
# File 'lib/stomp/client.rb', line 315

def hbsend_interval()
  @connection.hbsend_interval()
end

#join(limit = nil) ⇒ Object

join the listener thread for this client, generally used to wait for a quit signal.



139
140
141
# File 'lib/stomp/client.rb', line 139

def join(limit = nil)
  @listener_thread.join(limit)
end

#jruby?Boolean

jruby? tests if the connection has detcted a JRuby environment

Returns:



272
273
274
# File 'lib/stomp/client.rb', line 272

def jruby?()
  @connection.jruby
end

#nack(message, headers = {}) ⇒ Object

Stomp 1.1+ NACK.



216
217
218
219
# File 'lib/stomp/client.rb', line 216

def nack(message, headers = {})
  context = ack_context_for(message, headers)
  @connection.nack context[:message_id], context[:headers]
end

#open?Boolean

open? tests if this client connection is open.

Returns:



262
263
264
# File 'lib/stomp/client.rb', line 262

def open?
  @connection.open?()
end

#pollObject

Poll for asynchronous messages issued by broker. Return nil of no message available, else the message



336
337
338
# File 'lib/stomp/client.rb', line 336

def poll()
  @connection.poll()
end

#protocolObject

protocol returns the current client’s protocol level.



295
296
297
# File 'lib/stomp/client.rb', line 295

def protocol()
  @connection.protocol()
end

#publish(destination, message, headers = {}) ⇒ Object

Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).



244
245
246
247
248
249
# File 'lib/stomp/client.rb', line 244

def publish(destination, message, headers = {})
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  @connection.publish(destination, message, headers)
end

#runningObject

running checks if the thread was created and is not dead.



284
285
286
# File 'lib/stomp/client.rb', line 284

def running()
  @listener_thread && !!@listener_thread.status
end

#set_logger(logger) ⇒ Object

set_logger identifies a new callback logger.



289
290
291
292
# File 'lib/stomp/client.rb', line 289

def set_logger(logger)
  @logger = logger
  @connection.set_logger(logger)
end

#sha1(data) ⇒ Object

sha1 returns a SHA1 sum of a given string.



305
306
307
# File 'lib/stomp/client.rb', line 305

def sha1(data)
  @connection.sha1(data)
end

#subscribe(destination, headers = {}) ⇒ Object

Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).



171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/stomp/client.rb', line 171

def subscribe(destination, headers = {})
  raise "No listener given" unless block_given?
  # use subscription id to correlate messages to subscription. As described in
  # the SUBSCRIPTION section of the protocol: http://stomp.github.com/.
  # If no subscription id is provided, generate one.
  set_subscription_id_if_missing(destination, headers)
  if @listeners[headers[:id]]
    raise "attempting to subscribe to a queue with a previous subscription"
  end
  @listeners[headers[:id]] = lambda {|msg| yield msg}
  @connection.subscribe(destination, headers)
end

#unreceive(message, options = {}) ⇒ Object

Unreceive a message, sending it back to its queue or to the DLQ.



236
237
238
# File 'lib/stomp/client.rb', line 236

def unreceive(message, options = {})
  @connection.unreceive(message, options)
end

#unsubscribe(name, headers = {}) ⇒ Object

Unsubscribe from a subscription by name.



185
186
187
188
189
# File 'lib/stomp/client.rb', line 185

def unsubscribe(name, headers = {})
  set_subscription_id_if_missing(name, headers)
  @connection.unsubscribe(name, headers)
  @listeners[headers[:id]] = nil
end

#uuidObject

uuid returns a type 4 UUID.



310
311
312
# File 'lib/stomp/client.rb', line 310

def uuid()
  @connection.uuid()
end

#valid_utf8?(s) ⇒ Boolean

valid_utf8? validates any given string for UTF8 compliance.

Returns:



300
301
302
# File 'lib/stomp/client.rb', line 300

def valid_utf8?(s)
  @connection.valid_utf8?(s)
end