Class: Stomp::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/stomp/client.rb,
lib/client/utils.rb

Overview

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) ⇒ Client

A new Client object can be initialized using three forms:

Hash (this is the recommended Client initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  :reliable => true,
  :initial_reconnect_delay => 0.01,
  :max_reconnect_delay => 30.0,
  :use_exponential_back_off => true,
  :back_off_multiplier => 2,
  :max_reconnect_attempts => 0,
  :randomize => false,
  :connect_timeout => 0,
  :connect_headers => {},
  :parse_timeout => 5,
  :logger => nil,
  :dmh => false,
  :closed_check => true,
  :hbser => false,
  :stompconn => false,
  :usecrlf => false,
  :max_hbread_fails => 0,
  :max_hbrlck_fails => 0,
  :fast_hbs_adjust => 0.0,
  :connread_timeout => 0,
  :tcp_nodelay => true,
  :start_timeout => 0,
  :sslctx_newparm => nil,
}

e.g. c = Stomp::Client.new(hash)

Positional parameters:

login     (String,  default : '')
passcode  (String,  default : '')
host      (String,  default : 'localhost')
port      (Integer, default : 61613)
reliable  (Boolean, default : false)

e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://login:passcode@host:port
stomp://login:passcode@host.domain.tld:port

e.g. c = Stomp::Client.new(urlstring)


79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/stomp/client.rb', line 79

def initialize( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false)
  parse_hash_params() ||
    parse_stomp_url() ||
    parse_failover_url() ||
    parse_positional_params(, passcode, host, port, reliable)

  @logger = @parameters[:logger] ||= Stomp::NullLogger.new
  @start_timeout = @parameters[:start_timeout] || 0
  check_arguments!()

  begin
    timeout(@start_timeout) {
      create_error_handler
      create_connection(autoflush)
      start_listeners()
    }
  rescue TimeoutError
    ex = Stomp::Error::StartTimeoutException.new(@start_timeout)
    raise ex
  end
end

Instance Attribute Details

#parametersObject (readonly)

Parameters hash



19
20
21
# File 'lib/stomp/client.rb', line 19

def parameters
  @parameters
end

Class Method Details

.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) ⇒ Object

open is syntactic sugar for 'Client.new', see 'initialize' for usage.



125
126
127
# File 'lib/stomp/client.rb', line 125

def self.open( = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
  Client.new(, passcode, host, port, reliable)
end

Instance Method Details

#abort(name, headers = {}) ⇒ Object

Abort aborts work in a transaction by name.



141
142
143
144
145
146
147
148
149
150
151
# File 'lib/stomp/client.rb', line 141

def abort(name, headers = {})
  @connection.abort(name, headers)

  # replay any ack'd messages in this transaction
  replay_list = @replay_messages_by_txn[name]
  if replay_list
    replay_list.each do |message|
      find_listener(message) # find_listener also calls the listener
    end
  end
end

#ack(message, headers = {}) ⇒ Object Also known as: acknowledge

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,=> 'client'). Accepts a transaction header ( :transaction => 'some_transaction_id' ).



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/stomp/client.rb', line 186

def ack(message, headers = {})
  txn_id = headers[:transaction]
  if txn_id
    # lets keep around messages ack'd in this transaction in case we rollback
    replay_list = @replay_messages_by_txn[txn_id]
    if replay_list.nil?
      replay_list = []
      @replay_messages_by_txn[txn_id] = replay_list
    end
    replay_list << message
  end
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  context = ack_context_for(message, headers)
  @connection.ack context[:message_id], context[:headers]
end

#ack_context_for(message, headers) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/stomp/client.rb', line 214

def ack_context_for(message, headers)
  id = case protocol
    when Stomp::SPL_12
     'ack'
    when Stomp::SPL_11
     headers.merge!(:subscription => message.headers['subscription'])
     'message-id'
    else
     'message-id'
  end
  {:message_id => message.headers[id], :headers => headers}
end

#autoflushObject

autoflush returns the current connection's autoflush setting.



338
339
340
# File 'lib/stomp/client.rb', line 338

def autoflush()
  @connection.autoflush()
end

#autoflush=(af) ⇒ Object

autoflush= sets the current connection's autoflush setting.



333
334
335
# File 'lib/stomp/client.rb', line 333

def autoflush=(af)
  @connection.autoflush = af
end

#begin(name, headers = {}) ⇒ Object

Begin starts work in a a transaction by name.



136
137
138
# File 'lib/stomp/client.rb', line 136

def begin(name, headers = {})
  @connection.begin(name, headers)
end

#close(headers = {}) ⇒ Object

close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.



270
271
272
273
# File 'lib/stomp/client.rb', line 270

def close(headers={})
  @listener_thread.exit
  @connection.disconnect(headers)
end

#closed?Boolean

close? tests if this client connection is closed.



259
260
261
# File 'lib/stomp/client.rb', line 259

def closed?()
  @connection.closed?()
end

#commit(name, headers = {}) ⇒ Object

Commit commits work in a transaction by name.



154
155
156
157
158
# File 'lib/stomp/client.rb', line 154

def commit(name, headers = {})
  txn_id = headers[:transaction]
  @replay_messages_by_txn.delete(txn_id)
  @connection.commit(name, headers)
end

#connection_frameObject

Return the broker's CONNECTED frame to the client. Misnamed.



244
245
246
# File 'lib/stomp/client.rb', line 244

def connection_frame()
  @connection.connection_frame
end

#create_error_handlerObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/stomp/client.rb', line 101

def create_error_handler
  client_thread = Thread.current

  @error_listener = lambda do |error|
    exception = case error.body
                  when /ResourceAllocationException/i
                    Stomp::Error::ProducerFlowControlException.new(error)
                  when /ProtocolException/i
                    Stomp::Error::ProtocolException.new(error)
                  else
                    Stomp::Error::BrokerException.new(error)
                end

    client_thread.raise exception
  end
end

#disconnect_receiptObject

Return any RECEIPT frame received by DISCONNECT.



249
250
251
# File 'lib/stomp/client.rb', line 249

def disconnect_receipt()
  @connection.disconnect_receipt
end

#hbrecv_countObject

hbrecv_count returns the current connection's heartbeat receive count.



322
323
324
# File 'lib/stomp/client.rb', line 322

def hbrecv_count()
  @connection.hbrecv_count()
end

#hbrecv_intervalObject

hbrecv_interval returns the connection's heartbeat receive interval.



312
313
314
# File 'lib/stomp/client.rb', line 312

def hbrecv_interval()
  @connection.hbrecv_interval()
end

#hbsend_countObject

hbsend_count returns the current connection's heartbeat send count.



317
318
319
# File 'lib/stomp/client.rb', line 317

def hbsend_count()
  @connection.hbsend_count()
end

#hbsend_intervalObject

hbsend_interval returns the connection's heartbeat send interval.



307
308
309
# File 'lib/stomp/client.rb', line 307

def hbsend_interval()
  @connection.hbsend_interval()
end

#join(limit = nil) ⇒ Object

join the listener thread for this client, generally used to wait for a quit signal.



131
132
133
# File 'lib/stomp/client.rb', line 131

def join(limit = nil)
  @listener_thread.join(limit)
end

#jruby?Boolean

jruby? tests if the connection has detcted a JRuby environment



264
265
266
# File 'lib/stomp/client.rb', line 264

def jruby?()
  @connection.jruby
end

#nack(message, headers = {}) ⇒ Object

Stomp 1.1+ NACK.



208
209
210
211
# File 'lib/stomp/client.rb', line 208

def nack(message, headers = {})
  context = ack_context_for(message, headers)
  @connection.nack context[:message_id], context[:headers]
end

#open?Boolean

open? tests if this client connection is open.



254
255
256
# File 'lib/stomp/client.rb', line 254

def open?
  @connection.open?()
end

#pollObject

Poll for asynchronous messages issued by broker. Return nil of no message available, else the message



328
329
330
# File 'lib/stomp/client.rb', line 328

def poll()
  @connection.poll()
end

#protocolObject

protocol returns the current client's protocol level.



287
288
289
# File 'lib/stomp/client.rb', line 287

def protocol()
  @connection.protocol()
end

#publish(destination, message, headers = {}) ⇒ Object

Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => 'some_transaction_id' ).



236
237
238
239
240
241
# File 'lib/stomp/client.rb', line 236

def publish(destination, message, headers = {})
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  @connection.publish(destination, message, headers)
end

#runningObject

running checks if the thread was created and is not dead.



276
277
278
# File 'lib/stomp/client.rb', line 276

def running()
  @listener_thread && !!@listener_thread.status
end

#set_logger(logger) ⇒ Object

set_logger identifies a new callback logger.



281
282
283
284
# File 'lib/stomp/client.rb', line 281

def set_logger(logger)
  @logger = logger
  @connection.set_logger(logger)
end

#sha1(data) ⇒ Object

sha1 returns a SHA1 sum of a given string.



297
298
299
# File 'lib/stomp/client.rb', line 297

def sha1(data)
  @connection.sha1(data)
end

#subscribe(destination, headers = {}) ⇒ Object

Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => 'some_transaction_id' ).



163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/stomp/client.rb', line 163

def subscribe(destination, headers = {})
  raise "No listener given" unless block_given?
  # use subscription id to correlate messages to subscription. As described in
  # the SUBSCRIPTION section of the protocol: http://stomp.github.com/.
  # If no subscription id is provided, generate one.
  set_subscription_id_if_missing(destination, headers)
  if @listeners[headers[:id]]
    raise "attempting to subscribe to a queue with a previous subscription"
  end
  @listeners[headers[:id]] = lambda {|msg| yield msg}
  @connection.subscribe(destination, headers)
end

#unreceive(message, options = {}) ⇒ Object

Unreceive a message, sending it back to its queue or to the DLQ.



228
229
230
# File 'lib/stomp/client.rb', line 228

def unreceive(message, options = {})
  @connection.unreceive(message, options)
end

#unsubscribe(name, headers = {}) ⇒ Object

Unsubscribe from a subscription by name.



177
178
179
180
181
# File 'lib/stomp/client.rb', line 177

def unsubscribe(name, headers = {})
  set_subscription_id_if_missing(name, headers)
  @connection.unsubscribe(name, headers)
  @listeners[headers[:id]] = nil
end

#uuidObject

uuid returns a type 4 UUID.



302
303
304
# File 'lib/stomp/client.rb', line 302

def uuid()
  @connection.uuid()
end

#valid_utf8?(s) ⇒ Boolean

valid_utf8? validates any given string for UTF8 compliance.



292
293
294
# File 'lib/stomp/client.rb', line 292

def valid_utf8?(s)
  @connection.valid_utf8?(s)
end