Class: Stomp::Client
- Inherits:
-
Object
- Object
- Stomp::Client
- Extended by:
- Forwardable
- Defined in:
- lib/stomp/client.rb,
lib/client/utils.rb
Overview
Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.
Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.
Instance Attribute Summary collapse
-
#parameters ⇒ Object
readonly
Parameters hash.
Class Method Summary collapse
-
.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) ⇒ Object
open is syntactic sugar for ‘Client.new’, see ‘initialize’ for usage.
Instance Method Summary collapse
-
#abort(name, headers = {}) ⇒ Object
Abort aborts work in a transaction by name.
-
#ack(message, headers = {}) ⇒ Object
(also: #acknowledge)
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,=> ‘client’). Accepts a transaction header ( :transaction => ‘some_transaction_id’ )..
- #ack_context_for(message, headers) ⇒ Object
-
#autoflush ⇒ Object
autoflush returns the current connection’s autoflush setting.
-
#autoflush=(af) ⇒ Object
autoflush= sets the current connection’s autoflush setting.
-
#begin(name, headers = {}) ⇒ Object
Begin starts work in a a transaction by name.
-
#close(headers = {}) ⇒ Object
close frees resources in use by this client.
-
#closed? ⇒ Boolean
close? tests if this client connection is closed.
-
#commit(name, headers = {}) ⇒ Object
Commit commits work in a transaction by name.
-
#connection_frame ⇒ Object
Return the broker’s CONNECTED frame to the client.
- #create_error_handler ⇒ Object
-
#disconnect_receipt ⇒ Object
Return any RECEIPT frame received by DISCONNECT.
-
#hbrecv_count ⇒ Object
hbrecv_count returns the current connection’s heartbeat receive count.
-
#hbrecv_interval ⇒ Object
hbrecv_interval returns the connection’s heartbeat receive interval.
-
#hbsend_count ⇒ Object
hbsend_count returns the current connection’s heartbeat send count.
-
#hbsend_interval ⇒ Object
hbsend_interval returns the connection’s heartbeat send interval.
-
#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) ⇒ Client
constructor
A new Client object can be initialized using three forms:.
-
#join(limit = nil) ⇒ Object
join the listener thread for this client, generally used to wait for a quit signal.
-
#jruby? ⇒ Boolean
jruby? tests if the connection has detcted a JRuby environment.
-
#nack(message, headers = {}) ⇒ Object
Stomp 1.1+ NACK.
-
#open? ⇒ Boolean
open? tests if this client connection is open.
-
#poll ⇒ Object
Poll for asynchronous messages issued by broker.
-
#protocol ⇒ Object
protocol returns the current client’s protocol level.
-
#publish(destination, message, headers = {}) ⇒ Object
Publishes message to destination.
-
#running ⇒ Object
running checks if the thread was created and is not dead.
-
#set_logger(logger) ⇒ Object
set_logger identifies a new callback logger.
-
#sha1(data) ⇒ Object
sha1 returns a SHA1 sum of a given string.
-
#subscribe(destination, headers = {}) ⇒ Object
Subscribe to a destination, must be passed a block which will be used as a callback listener.
-
#unreceive(message, options = {}) ⇒ Object
Unreceive a message, sending it back to its queue or to the DLQ.
-
#unsubscribe(name, headers = {}) ⇒ Object
Unsubscribe from a subscription by name.
-
#uuid ⇒ Object
uuid returns a type 4 UUID.
-
#valid_utf8?(s) ⇒ Boolean
valid_utf8? validates any given string for UTF8 compliance.
Constructor Details
#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) ⇒ Client
A new Client object can be initialized using three forms:
Hash (this is the recommended Client initialization method):
hash = {
:hosts => [
{:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
{:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
],
# These are the default parameters and do not need to be set
:reliable => true, # reliable (use failover)
:initial_reconnect_delay => 0.01, # initial delay before reconnect (secs)
:max_reconnect_delay => 30.0, # max delay before reconnect
:use_exponential_back_off => true, # increase delay between reconnect attpempts
:back_off_multiplier => 2, # next delay multiplier
:max_reconnect_attempts => 0, # retry forever, use # for maximum attempts
:randomize => false, # do not radomize hosts hash before reconnect
:connect_timeout => 0, # Timeout for TCP/TLS connects, use # for max seconds
:connect_headers => {}, # user supplied CONNECT headers (req'd for Stomp 1.1+)
:parse_timeout => 5, # IO::select wait time on socket reads
:logger => nil, # user suplied callback logger instance
:dmh => false, # do not support multihomed IPV4 / IPV6 hosts during failover
:closed_check => true, # check first if closed in each protocol method
:hbser => false, # raise on heartbeat send exception
:stompconn => false, # Use STOMP instead of CONNECT
:usecrlf => false, # Use CRLF command and header line ends (1.2+)
:max_hbread_fails => 0, # Max HB read fails before retry. 0 => never retry
:max_hbrlck_fails => 0, # Max HB read lock obtain fails before retry. 0 => never retry
:fast_hbs_adjust => 0.0, # Fast heartbeat senders sleep adjustment, seconds, needed ...
# For fast heartbeat senders. 'fast' == YMMV. If not
# correct for your environment, expect unnecessary fail overs
:connread_timeout => 0, # Timeout during CONNECT for read of CONNECTED/ERROR, secs
:tcp_nodelay => true, # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm
:start_timeout => 0, # Timeout around Stomp::Client initialization
:sslctx_newparm => nil, # Param for SSLContext.new
:ssl_post_conn_check => true, # Further verify broker identity
}
e.g. c = Stomp::Client.new(hash)
Positional parameters:
login (String, default : '')
passcode (String, default : '')
host (String, default : 'localhost')
port (Integer, default : 61613)
reliable (Boolean, default : false)
e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms:
stomp://host:port
stomp://host.domain.tld:port
stomp://login:passcode@host:port
stomp://login:passcode@host.domain.tld:port
e.g. c = Stomp::Client.new(urlstring)
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/stomp/client.rb', line 83 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) parse_hash_params(login) || parse_stomp_url(login) || parse_failover_url(login) || parse_positional_params(login, passcode, host, port, reliable) @logger = @parameters[:logger] ||= Stomp::NullLogger.new @start_timeout = @parameters[:start_timeout] || 0 check_arguments!() # p [ "cldbg01", @parameters ] begin Timeout::timeout(@start_timeout) { create_error_handler create_connection(autoflush) start_listeners() } rescue TimeoutError # p [ "cldbg02" ] ex = Stomp::Error::StartTimeoutException.new(@start_timeout) raise ex end end |
Instance Attribute Details
#parameters ⇒ Object (readonly)
Parameters hash
19 20 21 |
# File 'lib/stomp/client.rb', line 19 def parameters @parameters end |
Class Method Details
.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) ⇒ Object
open is syntactic sugar for ‘Client.new’, see ‘initialize’ for usage.
133 134 135 |
# File 'lib/stomp/client.rb', line 133 def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end |
Instance Method Details
#abort(name, headers = {}) ⇒ Object
Abort aborts work in a transaction by name.
149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/stomp/client.rb', line 149 def abort(name, headers = {}) @connection.abort(name, headers) # replay any ack'd messages in this transaction replay_list = [name] if replay_list replay_list.each do || find_listener() # find_listener also calls the listener end end end |
#ack(message, headers = {}) ⇒ Object Also known as: acknowledge
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,=> ‘client’). Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/stomp/client.rb', line 194 def ack(, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = [txn_id] if replay_list.nil? replay_list = [] [txn_id] = replay_list end replay_list << end if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end context = ack_context_for(, headers) @connection.ack context[:message_id], context[:headers] end |
#ack_context_for(message, headers) ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/stomp/client.rb', line 222 def ack_context_for(, headers) id = case protocol when Stomp::SPL_12 'ack' when Stomp::SPL_11 headers.merge!(:subscription => .headers['subscription']) 'message-id' else 'message-id' end {:message_id => .headers[id], :headers => headers} end |
#autoflush ⇒ Object
autoflush returns the current connection’s autoflush setting.
346 347 348 |
# File 'lib/stomp/client.rb', line 346 def autoflush() @connection.autoflush() end |
#autoflush=(af) ⇒ Object
autoflush= sets the current connection’s autoflush setting.
341 342 343 |
# File 'lib/stomp/client.rb', line 341 def autoflush=(af) @connection.autoflush = af end |
#begin(name, headers = {}) ⇒ Object
Begin starts work in a a transaction by name.
144 145 146 |
# File 'lib/stomp/client.rb', line 144 def begin(name, headers = {}) @connection.begin(name, headers) end |
#close(headers = {}) ⇒ Object
close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.
278 279 280 281 |
# File 'lib/stomp/client.rb', line 278 def close(headers={}) @listener_thread.exit @connection.disconnect(headers) end |
#closed? ⇒ Boolean
close? tests if this client connection is closed.
267 268 269 |
# File 'lib/stomp/client.rb', line 267 def closed?() @connection.closed?() end |
#commit(name, headers = {}) ⇒ Object
Commit commits work in a transaction by name.
162 163 164 165 166 |
# File 'lib/stomp/client.rb', line 162 def commit(name, headers = {}) txn_id = headers[:transaction] .delete(txn_id) @connection.commit(name, headers) end |
#connection_frame ⇒ Object
Return the broker’s CONNECTED frame to the client. Misnamed.
252 253 254 |
# File 'lib/stomp/client.rb', line 252 def connection_frame() @connection.connection_frame end |
#create_error_handler ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/stomp/client.rb', line 108 def create_error_handler client_thread = Thread.current @error_listener = lambda do |error| exception = case error.body when /ResourceAllocationException/i Stomp::Error::ProducerFlowControlException.new(error) when /ProtocolException/i Stomp::Error::ProtocolException.new(error) else Stomp::Error::BrokerException.new(error) end client_thread.raise exception end end |
#disconnect_receipt ⇒ Object
Return any RECEIPT frame received by DISCONNECT.
257 258 259 |
# File 'lib/stomp/client.rb', line 257 def disconnect_receipt() @connection.disconnect_receipt end |
#hbrecv_count ⇒ Object
hbrecv_count returns the current connection’s heartbeat receive count.
330 331 332 |
# File 'lib/stomp/client.rb', line 330 def hbrecv_count() @connection.hbrecv_count() end |
#hbrecv_interval ⇒ Object
hbrecv_interval returns the connection’s heartbeat receive interval.
320 321 322 |
# File 'lib/stomp/client.rb', line 320 def hbrecv_interval() @connection.hbrecv_interval() end |
#hbsend_count ⇒ Object
hbsend_count returns the current connection’s heartbeat send count.
325 326 327 |
# File 'lib/stomp/client.rb', line 325 def hbsend_count() @connection.hbsend_count() end |
#hbsend_interval ⇒ Object
hbsend_interval returns the connection’s heartbeat send interval.
315 316 317 |
# File 'lib/stomp/client.rb', line 315 def hbsend_interval() @connection.hbsend_interval() end |
#join(limit = nil) ⇒ Object
join the listener thread for this client, generally used to wait for a quit signal.
139 140 141 |
# File 'lib/stomp/client.rb', line 139 def join(limit = nil) @listener_thread.join(limit) end |
#jruby? ⇒ Boolean
jruby? tests if the connection has detcted a JRuby environment
272 273 274 |
# File 'lib/stomp/client.rb', line 272 def jruby?() @connection.jruby end |
#nack(message, headers = {}) ⇒ Object
Stomp 1.1+ NACK.
216 217 218 219 |
# File 'lib/stomp/client.rb', line 216 def nack(, headers = {}) context = ack_context_for(, headers) @connection.nack context[:message_id], context[:headers] end |
#open? ⇒ Boolean
open? tests if this client connection is open.
262 263 264 |
# File 'lib/stomp/client.rb', line 262 def open? @connection.open?() end |
#poll ⇒ Object
Poll for asynchronous messages issued by broker. Return nil of no message available, else the message
336 337 338 |
# File 'lib/stomp/client.rb', line 336 def poll() @connection.poll() end |
#protocol ⇒ Object
protocol returns the current client’s protocol level.
295 296 297 |
# File 'lib/stomp/client.rb', line 295 def protocol() @connection.protocol() end |
#publish(destination, message, headers = {}) ⇒ Object
Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
244 245 246 247 248 249 |
# File 'lib/stomp/client.rb', line 244 def publish(destination, , headers = {}) if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.publish(destination, , headers) end |
#running ⇒ Object
running checks if the thread was created and is not dead.
284 285 286 |
# File 'lib/stomp/client.rb', line 284 def running() @listener_thread && !!@listener_thread.status end |
#set_logger(logger) ⇒ Object
set_logger identifies a new callback logger.
289 290 291 292 |
# File 'lib/stomp/client.rb', line 289 def set_logger(logger) @logger = logger @connection.set_logger(logger) end |
#sha1(data) ⇒ Object
sha1 returns a SHA1 sum of a given string.
305 306 307 |
# File 'lib/stomp/client.rb', line 305 def sha1(data) @connection.sha1(data) end |
#subscribe(destination, headers = {}) ⇒ Object
Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/stomp/client.rb', line 171 def subscribe(destination, headers = {}) raise "No listener given" unless block_given? # use subscription id to correlate messages to subscription. As described in # the SUBSCRIPTION section of the protocol: http://stomp.github.com/. # If no subscription id is provided, generate one. set_subscription_id_if_missing(destination, headers) if @listeners[headers[:id]] raise "attempting to subscribe to a queue with a previous subscription" end @listeners[headers[:id]] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end |
#unreceive(message, options = {}) ⇒ Object
Unreceive a message, sending it back to its queue or to the DLQ.
236 237 238 |
# File 'lib/stomp/client.rb', line 236 def unreceive(, = {}) @connection.unreceive(, ) end |
#unsubscribe(name, headers = {}) ⇒ Object
Unsubscribe from a subscription by name.
185 186 187 188 189 |
# File 'lib/stomp/client.rb', line 185 def unsubscribe(name, headers = {}) set_subscription_id_if_missing(name, headers) @connection.unsubscribe(name, headers) @listeners[headers[:id]] = nil end |
#uuid ⇒ Object
uuid returns a type 4 UUID.
310 311 312 |
# File 'lib/stomp/client.rb', line 310 def uuid() @connection.uuid() end |
#valid_utf8?(s) ⇒ Boolean
valid_utf8? validates any given string for UTF8 compliance.
300 301 302 |
# File 'lib/stomp/client.rb', line 300 def valid_utf8?(s) @connection.valid_utf8?(s) end |