Class: Stomp::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/stomp/client.rb,
lib/client/utils.rb

Overview

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) ⇒ Client

A new Client object can be initialized using three forms:

Hash (this is the recommended Client initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  # These are the default parameters and do not need to be set
  :reliable => true,                  # reliable (use failover)
  :initial_reconnect_delay => 0.01,   # initial delay before reconnect (secs)
  :max_reconnect_delay => 30.0,       # max delay before reconnect
  :use_exponential_back_off => true,  # increase delay between reconnect attpempts
  :back_off_multiplier => 2,          # next delay multiplier
  :max_reconnect_attempts => 0,       # retry forever, use # for maximum attempts
  :randomize => false,                # do not radomize hosts hash before reconnect
  :connect_timeout => 0,              # Timeout for TCP/TLS connects, use # for max seconds
  :connect_headers => {},             # user supplied CONNECT headers (req'd for Stomp 1.1+)
  :parse_timeout => 5,                # IO::select wait time on socket reads
  :logger => nil,                     # user suplied callback logger instance
  :dmh => false,                      # do not support multihomed IPV4 / IPV6 hosts during failover
  :closed_check => true,              # check first if closed in each protocol method
  :hbser => false,                    # raise on heartbeat send exception
  :stompconn => false,                # Use STOMP instead of CONNECT
  :usecrlf => false,                  # Use CRLF command and header line ends (1.2+)
  :max_hbread_fails => 0,             # Max HB read fails before retry.  0 => never retry
  :max_hbrlck_fails => 0,             # Max HB read lock obtain fails before retry.  0 => never retry
  :fast_hbs_adjust => 0.0,            # Fast heartbeat senders sleep adjustment, seconds, needed ...
                                      # For fast heartbeat senders.  'fast' == YMMV.  If not
                                      # correct for your environment, expect unnecessary fail overs
  :connread_timeout => 0,             # Timeout during CONNECT for read of CONNECTED/ERROR, secs
  :tcp_nodelay => true,               # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm
  :start_timeout => 0,                # Timeout around Stomp::Client initialization
  :sslctx_newparm => nil,             # Param for SSLContext.new
  :ssl_post_conn_check => true,       # Further verify broker identity
  :nto_cmd_read => true,              # No timeout on COMMAND read
}

e.g. c = Stomp::Client.new(hash)

Positional parameters:

login     (String,  default : '')
passcode  (String,  default : '')
host      (String,  default : 'localhost')
port      (Integer, default : 61613)
reliable  (Boolean, default : false)

e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://login:[email protected]:port
stomp://login:[email protected]:port

e.g. c = Stomp::Client.new(urlstring)

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/stomp/client.rb', line 84

def initialize( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false)
  parse_hash_params() ||
    parse_stomp_url() ||
    parse_failover_url() ||
    parse_positional_params(, passcode, host, port, reliable)

  @logger = @parameters[:logger] ||= Stomp::NullLogger.new
  @start_timeout = @parameters[:start_timeout] || 0
  @parameters[:client_main] = Thread::current
  ## p [ "CLINDBG", @parameters[:client_main] ]
  check_arguments!()

  # p [ "cldbg01", @parameters ]

  begin
    Timeout::timeout(@start_timeout) {
      create_error_handler
      create_connection(autoflush)
      start_listeners()
    }
  rescue Timeout::Error
    # p [ "cldbg02" ]
    ex = Stomp::Error::StartTimeoutException.new(@start_timeout)
    raise ex
  end
end

Instance Attribute Details

#parametersObject (readonly)

Parameters hash


19
20
21
# File 'lib/stomp/client.rb', line 19

def parameters
  @parameters
end

Class Method Details

.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) ⇒ Object

open is syntactic sugar for 'Client.new', see 'initialize' for usage.


140
141
142
# File 'lib/stomp/client.rb', line 140

def self.open( = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
  Client.new(, passcode, host, port, reliable)
end

Instance Method Details

#abort(name, headers = {}) ⇒ Object

Abort aborts work in a transaction by name.


156
157
158
159
160
161
162
163
164
165
166
# File 'lib/stomp/client.rb', line 156

def abort(name, headers = {})
  @connection.abort(name, headers)

  # replay any ack'd messages in this transaction
  replay_list = @replay_messages_by_txn[name]
  if replay_list
    replay_list.each do |message|
      find_listener(message) # find_listener also calls the listener
    end
  end
end

#ack(message, headers = {}) ⇒ Object Also known as: acknowledge

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,=> 'client'). Accepts a transaction header ( :transaction => 'some_transaction_id' ).


205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/stomp/client.rb', line 205

def ack(message, headers = {})
  txn_id = headers[:transaction]
  if txn_id
    # lets keep around messages ack'd in this transaction in case we rollback
    replay_list = @replay_messages_by_txn[txn_id]
    if replay_list.nil?
      replay_list = []
      @replay_messages_by_txn[txn_id] = replay_list
    end
    replay_list << message
  end
  if block_given?
    headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r}))
  end
  context = ack_context_for(message, headers)
  @connection.ack context[:message_id], context[:headers]
end

#ack_context_for(message, headers) ⇒ Object


233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/stomp/client.rb', line 233

def ack_context_for(message, headers)
  id = case protocol
    when Stomp::SPL_12
     'ack'
    when Stomp::SPL_11
     headers = headers.merge(:subscription => message.headers['subscription'])
     'message-id'
    else
     'message-id'
  end
  {:message_id => message.headers[id], :headers => headers}
end

#autoflushObject

autoflush returns the current connection's autoflush setting.


359
360
361
# File 'lib/stomp/client.rb', line 359

def autoflush()
  @connection.autoflush()
end

#autoflush=(af) ⇒ Object

autoflush= sets the current connection's autoflush setting.


354
355
356
# File 'lib/stomp/client.rb', line 354

def autoflush=(af)
  @connection.autoflush = af
end

#begin(name, headers = {}) ⇒ Object

Begin starts work in a a transaction by name.


151
152
153
# File 'lib/stomp/client.rb', line 151

def begin(name, headers = {})
  @connection.begin(name, headers)
end

#close(headers = {}) ⇒ Object

close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.


291
292
293
294
# File 'lib/stomp/client.rb', line 291

def close(headers={})
  @listener_thread.exit
  @connection.disconnect(headers)
end

#closed?Boolean

close? tests if this client connection is closed.

Returns:

  • (Boolean)

280
281
282
# File 'lib/stomp/client.rb', line 280

def closed?()
  @connection.closed?()
end

#commit(name, headers = {}) ⇒ Object

Commit commits work in a transaction by name.


169
170
171
172
173
# File 'lib/stomp/client.rb', line 169

def commit(name, headers = {})
  txn_id = headers[:transaction]
  @replay_messages_by_txn.delete(txn_id)
  @connection.commit(name, headers)
end

#connection_frameObject

Return the broker's CONNECTED frame to the client. Misnamed.


265
266
267
# File 'lib/stomp/client.rb', line 265

def connection_frame()
  @connection.connection_frame
end

#create_error_handlerObject


111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/stomp/client.rb', line 111

def create_error_handler
  client_thread = Thread.current
  if client_thread.respond_to?(:report_on_exception=)
    client_thread.report_on_exception=false
  end

  @error_listener = lambda do |error|
    exception = case error.body
                  when /ResourceAllocationException/i
                    Stomp::Error::ProducerFlowControlException.new(error)
                  when /ProtocolException/i
                    Stomp::Error::ProtocolException.new(error)
                  else
                    Stomp::Error::BrokerException.new(error)
                end

    @receipt_listeners.delete(error.headers['receipt-id']) if error.headers['receipt-id']
    client_thread.raise exception
  end
end

#disconnect_receiptObject

Return any RECEIPT frame received by DISCONNECT.


270
271
272
# File 'lib/stomp/client.rb', line 270

def disconnect_receipt()
  @connection.disconnect_receipt
end

#hbrecv_countObject

hbrecv_count returns the current connection's heartbeat receive count.


343
344
345
# File 'lib/stomp/client.rb', line 343

def hbrecv_count()
  @connection.hbrecv_count()
end

#hbrecv_intervalObject

hbrecv_interval returns the connection's heartbeat receive interval.


333
334
335
# File 'lib/stomp/client.rb', line 333

def hbrecv_interval()
  @connection.hbrecv_interval()
end

#hbsend_countObject

hbsend_count returns the current connection's heartbeat send count.


338
339
340
# File 'lib/stomp/client.rb', line 338

def hbsend_count()
  @connection.hbsend_count()
end

#hbsend_intervalObject

hbsend_interval returns the connection's heartbeat send interval.


328
329
330
# File 'lib/stomp/client.rb', line 328

def hbsend_interval()
  @connection.hbsend_interval()
end

#join(limit = nil) ⇒ Object

join the listener thread for this client, generally used to wait for a quit signal.


146
147
148
# File 'lib/stomp/client.rb', line 146

def join(limit = nil)
  @listener_thread.join(limit)
end

#jruby?Boolean

jruby? tests if the connection has detcted a JRuby environment

Returns:

  • (Boolean)

285
286
287
# File 'lib/stomp/client.rb', line 285

def jruby?()
  @connection.jruby
end

#nack(message, headers = {}) ⇒ Object

Stomp 1.1+ NACK.


227
228
229
230
# File 'lib/stomp/client.rb', line 227

def nack(message, headers = {})
  context = ack_context_for(message, headers)
  @connection.nack context[:message_id], context[:headers]
end

#open?Boolean

open? tests if this client connection is open.

Returns:

  • (Boolean)

275
276
277
# File 'lib/stomp/client.rb', line 275

def open?
  @connection.open?()
end

#pollObject

Poll for asynchronous messages issued by broker. Return nil of no message available, else the message


349
350
351
# File 'lib/stomp/client.rb', line 349

def poll()
  @connection.poll()
end

#protocolObject

protocol returns the current client's protocol level.


308
309
310
# File 'lib/stomp/client.rb', line 308

def protocol()
  @connection.protocol()
end

#publish(destination, message, headers = {}) ⇒ Object

Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => 'some_transaction_id' ).


255
256
257
258
259
260
261
262
# File 'lib/stomp/client.rb', line 255

def publish(destination, message, headers = {})
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  if block_given?
    headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r}))
  end
  @connection.publish(destination, message, headers)
end

#runningObject

running checks if the thread was created and is not dead.


297
298
299
# File 'lib/stomp/client.rb', line 297

def running()
  @listener_thread && !!@listener_thread.status
end

#set_logger(logger) ⇒ Object

set_logger identifies a new callback logger.


302
303
304
305
# File 'lib/stomp/client.rb', line 302

def set_logger(logger)
  @logger = logger
  @connection.set_logger(logger)
end

#sha1(data) ⇒ Object

sha1 returns a SHA1 sum of a given string.


318
319
320
# File 'lib/stomp/client.rb', line 318

def sha1(data)
  @connection.sha1(data)
end

#subscribe(destination, headers = {}) ⇒ Object

Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => 'some_transaction_id' ).


178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/stomp/client.rb', line 178

def subscribe(destination, headers = {})
  raise Stomp::Error::NoListenerGiven unless block_given?
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  # use subscription id to correlate messages to subscription. As described in
  # the SUBSCRIPTION section of the protocol: http://stomp.github.com/.
  # If no subscription id is provided, generate one.
  headers = headers.merge(:id => build_subscription_id(destination, headers))
  if @listeners[headers[:id]]
    raise Stomp::Error::DuplicateSubscription
  end
  @listeners[headers[:id]] = lambda {|msg| yield msg}
  @connection.subscribe(destination, headers)
end

#unreceive(message, options = {}) ⇒ Object

Unreceive a message, sending it back to its queue or to the DLQ.


247
248
249
# File 'lib/stomp/client.rb', line 247

def unreceive(message, options = {})
  @connection.unreceive(message, options)
end

#unsubscribe(destination, headers = {}) ⇒ Object

Unsubscribe from a subscription by name.


194
195
196
197
198
199
200
# File 'lib/stomp/client.rb', line 194

def unsubscribe(destination, headers = {})
  headers = headers.symbolize_keys
  raise Stomp::Error::DestinationRequired unless destination
  headers = headers.merge(:id => build_subscription_id(destination, headers))
  @connection.unsubscribe(destination, headers)
  @listeners[headers[:id]] = nil
end

#uuidObject

uuid returns a type 4 UUID.


323
324
325
# File 'lib/stomp/client.rb', line 323

def uuid()
  @connection.uuid()
end

#valid_utf8?(s) ⇒ Boolean

valid_utf8? validates any given string for UTF8 compliance.

Returns:

  • (Boolean)

313
314
315
# File 'lib/stomp/client.rb', line 313

def valid_utf8?(s)
  @connection.valid_utf8?(s)
end