Class: Jabber::Stream
- Inherits:
-
Object
- Object
- Jabber::Stream
- Defined in:
- lib/xmpp4r/stream.rb
Overview
The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.
To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.
Direct Known Subclasses
Defined Under Namespace
Classes: ThreadBlock
Constant Summary collapse
- DISCONNECTED =
1
- CONNECTED =
2
Instance Attribute Summary collapse
-
#fd ⇒ Object
readonly
file descriptor used.
-
#status ⇒ Object
readonly
connection status.
Instance Method Summary collapse
-
#add_iq_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received Iqs.
-
#add_message_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received Messages.
-
#add_presence_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received Presences .
-
#add_stanza_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received Stanzas.
-
#add_xml_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received XML messages.
-
#close ⇒ Object
Closes the connection to the Jabber service.
- #close! ⇒ Object
-
#delete_iq_callback(ref) ⇒ Object
Delete an Iq callback.
-
#delete_message_callback(ref) ⇒ Object
Delete an Message callback.
-
#delete_presence_callback(ref) ⇒ Object
Delete a Presence callback.
-
#delete_stanza_callback(ref) ⇒ Object
Delete a Stanza callback.
-
#delete_xml_callback(ref) ⇒ Object
Delete an XML-messages callback.
-
#initialize(threaded = true) ⇒ Stream
constructor
Create a new stream (just initializes).
-
#is_connected? ⇒ Boolean
- Returns if this connection is connected to a Jabber service return
- Boolean
-
Connection status.
-
#is_disconnected? ⇒ Boolean
Returns if this connection is NOT connected to a Jabber service.
-
#on_exception(&block) ⇒ Object
Mounts a block to handle exceptions if they occur during the poll send.
-
#parse_failure(e) ⇒ Object
This method is called by the parser when a failure occurs.
-
#parser_end ⇒ Object
This method is called by the parser upon receiving
</stream:stream>
. -
#poll ⇒ Object
Starts a polling thread to send “keep alive” data to prevent the Jabber connection from closing for inactivity.
-
#process(max = nil) ⇒ Object
Process |max| XML stanzas and call listeners for all of them.
-
#receive(element) ⇒ Object
Processes a received REXML::Element and executes registered thread blocks and filters against it.
-
#send(xml, &block) ⇒ Object
Sends XML data to the socket and (optionally) waits to process received data.
-
#send_with_id(xml, &block) ⇒ Object
Send an XMMP stanza with an Jabber::XMLStanza#id.
-
#start(fd) ⇒ Object
Start the XML parser on the fd.
- #stop ⇒ Object
-
#wait_and_process(time = nil) ⇒ Object
Process an XML stanza and call the listeners for it.
Constructor Details
#initialize(threaded = true) ⇒ Stream
Create a new stream (just initializes)
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/xmpp4r/stream.rb', line 42 def initialize(threaded = true) @fd = nil @status = DISCONNECTED @xmlcbs = CallbackList::new @stanzacbs = CallbackList::new @messagecbs = CallbackList::new @iqcbs = CallbackList::new @presencecbs = CallbackList::new unless threaded $stderr.puts "Non-threaded mode is currently broken, re-enabling threaded" threaded = true end @threaded = threaded @stanzaqueue = [] @stanzaqueue_lock = Mutex::new @exception_block = nil @threadblocks = [] # @pollCounter = 10 @waiting_thread = nil @wakeup_thread = nil @streamid = nil @features_lock = Mutex.new end |
Instance Attribute Details
#fd ⇒ Object (readonly)
file descriptor used
34 35 36 |
# File 'lib/xmpp4r/stream.rb', line 34 def fd @fd end |
#status ⇒ Object (readonly)
connection status
37 38 39 |
# File 'lib/xmpp4r/stream.rb', line 37 def status @status end |
Instance Method Details
#add_iq_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received Iqs
- priority
- Integer
-
The callback’s priority, the higher, the sooner
- ref
- String
-
The callback’s reference
- &block
- Block
-
The optional block
538 539 540 |
# File 'lib/xmpp4r/stream.rb', line 538 def add_iq_callback(priority = 0, ref = nil, &block) @iqcbs.add(priority, ref, block) end |
#add_message_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received Messages
- priority
- Integer
-
The callback’s priority, the higher, the sooner
- ref
- String
-
The callback’s reference
- &block
- Block
-
The optional block
484 485 486 |
# File 'lib/xmpp4r/stream.rb', line 484 def (priority = 0, ref = nil, &block) @messagecbs.add(priority, ref, block) end |
#add_presence_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received Presences
- priority
- Integer
-
The callback’s priority, the higher, the sooner
- ref
- String
-
The callback’s reference
- &block
- Block
-
The optional block
520 521 522 |
# File 'lib/xmpp4r/stream.rb', line 520 def add_presence_callback(priority = 0, ref = nil, &block) @presencecbs.add(priority, ref, block) end |
#add_stanza_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received Stanzas
- priority
- Integer
-
The callback’s priority, the higher, the sooner
- ref
- String
-
The callback’s reference
- &block
- Block
-
The optional block
502 503 504 |
# File 'lib/xmpp4r/stream.rb', line 502 def add_stanza_callback(priority = 0, ref = nil, &block) @stanzacbs.add(priority, ref, block) end |
#add_xml_callback(priority = 0, ref = nil, &block) ⇒ Object
Adds a callback block to process received XML messages
- priority
- Integer
-
The callback’s priority, the higher, the sooner
- ref
- String
-
The callback’s reference
- &block
- Block
-
The optional block
466 467 468 |
# File 'lib/xmpp4r/stream.rb', line 466 def add_xml_callback(priority = 0, ref = nil, &block) @xmlcbs.add(priority, ref, block) end |
#close ⇒ Object
Closes the connection to the Jabber service
552 553 554 |
# File 'lib/xmpp4r/stream.rb', line 552 def close close! end |
#close! ⇒ Object
556 557 558 559 560 561 |
# File 'lib/xmpp4r/stream.rb', line 556 def close! @parserThread.kill if @parserThread # @pollThread.kill @fd.close if @fd and !@fd.closed? @status = DISCONNECTED end |
#delete_iq_callback(ref) ⇒ Object
Delete an Iq callback
- ref
- String
-
The reference of the callback to delete
547 548 549 |
# File 'lib/xmpp4r/stream.rb', line 547 def delete_iq_callback(ref) @iqcbs.delete(ref) end |
#delete_message_callback(ref) ⇒ Object
Delete an Message callback
- ref
- String
-
The reference of the callback to delete
492 493 494 |
# File 'lib/xmpp4r/stream.rb', line 492 def (ref) @messagecbs.delete(ref) end |
#delete_presence_callback(ref) ⇒ Object
Delete a Presence callback
- ref
- String
-
The reference of the callback to delete
528 529 530 |
# File 'lib/xmpp4r/stream.rb', line 528 def delete_presence_callback(ref) @presencecbs.delete(ref) end |
#delete_stanza_callback(ref) ⇒ Object
Delete a Stanza callback
- ref
- String
-
The reference of the callback to delete
510 511 512 |
# File 'lib/xmpp4r/stream.rb', line 510 def delete_stanza_callback(ref) @stanzacbs.delete(ref) end |
#delete_xml_callback(ref) ⇒ Object
Delete an XML-messages callback
- ref
- String
-
The reference of the callback to delete
474 475 476 |
# File 'lib/xmpp4r/stream.rb', line 474 def delete_xml_callback(ref) @xmlcbs.delete(ref) end |
#is_connected? ⇒ Boolean
Returns if this connection is connected to a Jabber service
- return
- Boolean
-
Connection status
158 159 160 |
# File 'lib/xmpp4r/stream.rb', line 158 def is_connected? return @status == CONNECTED end |
#is_disconnected? ⇒ Boolean
Returns if this connection is NOT connected to a Jabber service
- return
- Boolean
-
Connection status
166 167 168 |
# File 'lib/xmpp4r/stream.rb', line 166 def is_disconnected? return @status == DISCONNECTED end |
#on_exception(&block) ⇒ Object
Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.
The block has to take three arguments:
-
the Exception
-
the Jabber::Stream object (self)
-
a symbol where it happened, namely :start, :parser, :sending and :end
116 117 118 |
# File 'lib/xmpp4r/stream.rb', line 116 def on_exception(&block) @exception_block = block end |
#parse_failure(e) ⇒ Object
This method is called by the parser when a failure occurs
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/xmpp4r/stream.rb', line 122 def parse_failure(e) Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.}\n#{e.backtrace.join("\n")}") # A new thread has to be created because close will cause the thread # to commit suicide(???) if @exception_block # New thread, because close will kill the current thread Thread.new { close @exception_block.call(e, self, :parser) } else puts "Stream#parse_failure was called by XML parser. Dumping " + "backtrace...\n" + e.exception + "\n" puts e.backtrace close raise end end |
#parser_end ⇒ Object
This method is called by the parser upon receiving </stream:stream>
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/xmpp4r/stream.rb', line 144 def parser_end if @exception_block Thread.new { close @exception_block.call(nil, self, :close) } else close end end |
#poll ⇒ Object
Starts a polling thread to send “keep alive” data to prevent the Jabber connection from closing for inactivity.
Currently not working!
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 |
# File 'lib/xmpp4r/stream.rb', line 444 def poll sleep 10 while true sleep 2 # @pollCounter = @pollCounter - 1 # if @pollCounter < 0 # begin # send(" \t ") # rescue # Thread.new {@exception_block.call if @exception_block} # break # end # end end end |
#process(max = nil) ⇒ Object
Process |max| XML stanzas and call listeners for all of them.
- max
- Integer
-
the number of stanzas to process (nil means process
all available)
289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/xmpp4r/stream.rb', line 289 def process(max = nil) n = 0 @stanzaqueue_lock.lock while @stanzaqueue.size > 0 and (max == nil or n < max) e = @stanzaqueue.shift @stanzaqueue_lock.unlock process_one(e) n += 1 @stanzaqueue_lock.lock end @stanzaqueue_lock.unlock n end |
#receive(element) ⇒ Object
Processes a received REXML::Element and executes registered thread blocks and filters against it.
If in threaded mode, a new thread will be spawned for the call to receive_nonthreaded.
- element
- REXML::Element
-
The received element
177 178 179 180 181 182 183 184 185 |
# File 'lib/xmpp4r/stream.rb', line 177 def receive(element) if @threaded # Don't spawn a new thread here. An implicit feature # of XMPP is constant order of stanzas. receive_nonthreaded(element) else receive_nonthreaded(element) end end |
#send(xml, &block) ⇒ Object
Sends XML data to the socket and (optionally) waits to process received data.
Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).
- xml
- String
-
The xml data to send
- &block
- Block
-
The optional block
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 |
# File 'lib/xmpp4r/stream.rb', line 369 def send(xml, &block) Jabber::debuglog("SENDING:\n#{xml}") @threadblocks.unshift(ThreadBlock.new(block)) if block Thread.critical = true # we don't want to be interupted before we stop! begin @fd << xml.to_s @fd.flush rescue Exception => e Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.}\n#{e.backtrace.join("\n")}") if @exception_block Thread.new { close!; @exception_block.call(e, self, :sending) } else puts "Exception caught while sending!" close! raise end end Thread.critical = false # The parser thread might be running this (think of a callback running send()) # If this is the case, we mustn't stop (or we would cause a deadlock) Thread.stop if block and Thread.current != @parserThread @pollCounter = 10 end |
#send_with_id(xml, &block) ⇒ Object
Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned.
Be aware that if a stanza with type='error'
is received the function does not yield but raises an ErrorException with the corresponding error element.
Please see Stream#send for some implementational details.
Please read the note about nesting at Stream#send
- xml
- XMLStanza
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/xmpp4r/stream.rb', line 411 def send_with_id(xml, &block) if xml.id.nil? xml.id = Jabber::IdGenerator.instance.generate_id end res = nil error = nil send(xml) do |received| if received.kind_of? XMLStanza and received.id == xml.id if received.type == :error error = (received.error ? received.error : Error.new) true else res = yield(received) true end else false end end unless error.nil? raise ErrorException.new(error) end res end |
#start(fd) ⇒ Object
Start the XML parser on the fd
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/xmpp4r/stream.rb', line 68 def start(fd) @stream_mechanisms = [] @stream_features = {} @fd = fd @parser = StreamParser.new(@fd, self) @parserThread = Thread.new do begin @parser.parse rescue Exception => e Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.}\n#{e.backtrace.join("\n")}") if @exception_block Thread.new { close; @exception_block.call(e, self, :start) } else puts "Exception caught in Parser thread!" close raise end end end # @pollThread = Thread.new do # begin # poll # rescue # puts "Exception caught in Poll thread, dumping backtrace and" + # " exiting...\n" + $!.exception + "\n" # puts $!.backtrace # exit # end # end @status = CONNECTED end |
#stop ⇒ Object
102 103 104 105 |
# File 'lib/xmpp4r/stream.rb', line 102 def stop @parserThread.kill @parser = nil end |
#wait_and_process(time = nil) ⇒ Object
Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.
- time
- Integer
-
time to wait in seconds. If nil, wait infinitely.
all available)
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/xmpp4r/stream.rb', line 309 def wait_and_process(time = nil) if time == 0 return process(1) end @stanzaqueue_lock.lock if @stanzaqueue.size > 0 e = @stanzaqueue.shift @stanzaqueue_lock.unlock process_one(e) return 1 end @waiting_thread = Thread.current @wakeup_thread = Thread.new { sleep time ; @waiting_thread.wakeup if @waiting_thread } @waiting_thread.stop @wakeup_thread.kill if @wakeup_thread @wakeup_thread = nil @waiting_thread = nil @stanzaqueue_lock.lock if @stanzaqueue.size > 0 e = @stanzaqueue.shift @stanzaqueue_lock.unlock process_one(e) return 1 end return 0 end |