Class: Jabber::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/xmpp4r/stream.rb

Overview

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.

To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.

Direct Known Subclasses

Connection

Defined Under Namespace

Classes: ThreadBlock

Constant Summary collapse

DISCONNECTED =
1
CONNECTED =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(threaded = true) ⇒ Stream

Create a new stream (just initializes)



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/xmpp4r/stream.rb', line 42

def initialize(threaded = true)
  @fd = nil
  @status = DISCONNECTED
  @xmlcbs = CallbackList::new
  @stanzacbs = CallbackList::new
  @messagecbs = CallbackList::new
  @iqcbs = CallbackList::new
  @presencecbs = CallbackList::new
  unless threaded
    $stderr.puts "Non-threaded mode is currently broken, re-enabling threaded"
    threaded = true
  end
  @threaded = threaded
  @stanzaqueue = []
  @stanzaqueue_lock = Mutex::new
  @exception_block = nil
  @threadblocks = []
#      @pollCounter = 10
  @waiting_thread = nil
  @wakeup_thread = nil
  @streamid = nil
  @features_lock = Mutex.new
end

Instance Attribute Details

#fdObject (readonly)

file descriptor used



34
35
36
# File 'lib/xmpp4r/stream.rb', line 34

def fd
  @fd
end

#statusObject (readonly)

connection status



37
38
39
# File 'lib/xmpp4r/stream.rb', line 37

def status
  @status
end

Instance Method Details

#add_iq_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Iqs

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



538
539
540
# File 'lib/xmpp4r/stream.rb', line 538

def add_iq_callback(priority = 0, ref = nil, &block)
  @iqcbs.add(priority, ref, block)
end

#add_message_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Messages

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



484
485
486
# File 'lib/xmpp4r/stream.rb', line 484

def add_message_callback(priority = 0, ref = nil, &block)
  @messagecbs.add(priority, ref, block)
end

#add_presence_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Presences

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



520
521
522
# File 'lib/xmpp4r/stream.rb', line 520

def add_presence_callback(priority = 0, ref = nil, &block)
  @presencecbs.add(priority, ref, block)
end

#add_stanza_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received Stanzas

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



502
503
504
# File 'lib/xmpp4r/stream.rb', line 502

def add_stanza_callback(priority = 0, ref = nil, &block)
  @stanzacbs.add(priority, ref, block)
end

#add_xml_callback(priority = 0, ref = nil, &block) ⇒ Object

Adds a callback block to process received XML messages

priority
Integer

The callback’s priority, the higher, the sooner

ref
String

The callback’s reference

&block
Block

The optional block



466
467
468
# File 'lib/xmpp4r/stream.rb', line 466

def add_xml_callback(priority = 0, ref = nil, &block)
  @xmlcbs.add(priority, ref, block)
end

#closeObject

Closes the connection to the Jabber service



552
553
554
# File 'lib/xmpp4r/stream.rb', line 552

def close
  close!
end

#close!Object



556
557
558
559
560
561
# File 'lib/xmpp4r/stream.rb', line 556

def close!
  @parserThread.kill if @parserThread
#      @pollThread.kill
  @fd.close if @fd and !@fd.closed?
  @status = DISCONNECTED
end

#delete_iq_callback(ref) ⇒ Object

Delete an Iq callback

ref
String

The reference of the callback to delete



547
548
549
# File 'lib/xmpp4r/stream.rb', line 547

def delete_iq_callback(ref)
  @iqcbs.delete(ref)
end

#delete_message_callback(ref) ⇒ Object

Delete an Message callback

ref
String

The reference of the callback to delete



492
493
494
# File 'lib/xmpp4r/stream.rb', line 492

def delete_message_callback(ref)
  @messagecbs.delete(ref)
end

#delete_presence_callback(ref) ⇒ Object

Delete a Presence callback

ref
String

The reference of the callback to delete



528
529
530
# File 'lib/xmpp4r/stream.rb', line 528

def delete_presence_callback(ref)
  @presencecbs.delete(ref)
end

#delete_stanza_callback(ref) ⇒ Object

Delete a Stanza callback

ref
String

The reference of the callback to delete



510
511
512
# File 'lib/xmpp4r/stream.rb', line 510

def delete_stanza_callback(ref)
  @stanzacbs.delete(ref)
end

#delete_xml_callback(ref) ⇒ Object

Delete an XML-messages callback

ref
String

The reference of the callback to delete



474
475
476
# File 'lib/xmpp4r/stream.rb', line 474

def delete_xml_callback(ref)
  @xmlcbs.delete(ref)
end

#is_connected?Boolean

Returns if this connection is connected to a Jabber service

return
Boolean

Connection status

Returns:

  • (Boolean)


158
159
160
# File 'lib/xmpp4r/stream.rb', line 158

def is_connected?
  return @status == CONNECTED
end

#is_disconnected?Boolean

Returns if this connection is NOT connected to a Jabber service

return
Boolean

Connection status

Returns:

  • (Boolean)


166
167
168
# File 'lib/xmpp4r/stream.rb', line 166

def is_disconnected?
  return @status == DISCONNECTED
end

#on_exception(&block) ⇒ Object

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

The block has to take three arguments:

  • the Exception

  • the Jabber::Stream object (self)

  • a symbol where it happened, namely :start, :parser, :sending and :end



116
117
118
# File 'lib/xmpp4r/stream.rb', line 116

def on_exception(&block)
  @exception_block = block
end

#parse_failure(e) ⇒ Object

This method is called by the parser when a failure occurs



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/xmpp4r/stream.rb', line 122

def parse_failure(e)
  Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

  # A new thread has to be created because close will cause the thread
  # to commit suicide(???)
  if @exception_block
    # New thread, because close will kill the current thread
    Thread.new {
      close
      @exception_block.call(e, self, :parser)
    }
  else
    puts "Stream#parse_failure was called by XML parser. Dumping " +
    "backtrace...\n" + e.exception + "\n"
    puts e.backtrace
    close
    raise
  end
end

#parser_endObject

This method is called by the parser upon receiving </stream:stream>



144
145
146
147
148
149
150
151
152
153
# File 'lib/xmpp4r/stream.rb', line 144

def parser_end
  if @exception_block
    Thread.new {
      close
      @exception_block.call(nil, self, :close)
    }
  else
    close
  end
end

#pollObject

Starts a polling thread to send “keep alive” data to prevent the Jabber connection from closing for inactivity.

Currently not working!



444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
# File 'lib/xmpp4r/stream.rb', line 444

def poll
  sleep 10
  while true
    sleep 2
#        @pollCounter = @pollCounter - 1
#        if @pollCounter < 0
#          begin
#            send("  \t  ")
#          rescue
#            Thread.new {@exception_block.call if @exception_block}
#            break
#          end
#        end
  end
end

#process(max = nil) ⇒ Object

Process |max| XML stanzas and call listeners for all of them.

max
Integer

the number of stanzas to process (nil means process

all available)



289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/xmpp4r/stream.rb', line 289

def process(max = nil)
  n = 0
  @stanzaqueue_lock.lock
  while @stanzaqueue.size > 0 and (max == nil or n < max)
    e = @stanzaqueue.shift
    @stanzaqueue_lock.unlock
    process_one(e)
    n += 1
    @stanzaqueue_lock.lock
  end
  @stanzaqueue_lock.unlock
  n
end

#receive(element) ⇒ Object

Processes a received REXML::Element and executes registered thread blocks and filters against it.

If in threaded mode, a new thread will be spawned for the call to receive_nonthreaded.

element
REXML::Element

The received element



177
178
179
180
181
182
183
184
185
# File 'lib/xmpp4r/stream.rb', line 177

def receive(element)
  if @threaded
    # Don't spawn a new thread here. An implicit feature
    # of XMPP is constant order of stanzas.
    receive_nonthreaded(element)
  else
    receive_nonthreaded(element)
  end
end

#send(xml, &block) ⇒ Object

Sends XML data to the socket and (optionally) waits to process received data.

Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).

xml
String

The xml data to send

&block
Block

The optional block



369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# File 'lib/xmpp4r/stream.rb', line 369

def send(xml, &block)
  Jabber::debuglog("SENDING:\n#{xml}")
  @threadblocks.unshift(ThreadBlock.new(block)) if block
  Thread.critical = true # we don't want to be interupted before we stop!
  begin
    @fd << xml.to_s
    @fd.flush
  rescue Exception => e
    Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

    if @exception_block 
      Thread.new { close!; @exception_block.call(e, self, :sending) }
    else
      puts "Exception caught while sending!"
      close!
      raise
    end
  end
  Thread.critical = false
  # The parser thread might be running this (think of a callback running send())
  # If this is the case, we mustn't stop (or we would cause a deadlock)
  Thread.stop if block and Thread.current != @parserThread
  @pollCounter = 10
end

#send_with_id(xml, &block) ⇒ Object

Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned.

Be aware that if a stanza with type='error' is received the function does not yield but raises an ErrorException with the corresponding error element.

Please see Stream#send for some implementational details.

Please read the note about nesting at Stream#send

xml
XMLStanza


411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# File 'lib/xmpp4r/stream.rb', line 411

def send_with_id(xml, &block)
  if xml.id.nil?
    xml.id = Jabber::IdGenerator.instance.generate_id
  end

  res = nil
  error = nil
  send(xml) do |received|
    if received.kind_of? XMLStanza and received.id == xml.id
      if received.type == :error
        error = (received.error ? received.error : Error.new)
        true
      else
        res = yield(received)
        true
      end
    else
      false
    end
  end

  unless error.nil?
    raise ErrorException.new(error)
  end

  res
end

#start(fd) ⇒ Object

Start the XML parser on the fd



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/xmpp4r/stream.rb', line 68

def start(fd)
  @stream_mechanisms = []
  @stream_features = {}

  @fd = fd
  @parser = StreamParser.new(@fd, self)
  @parserThread = Thread.new do
    begin
      @parser.parse
    rescue Exception => e
      Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

      if @exception_block
        Thread.new { close; @exception_block.call(e, self, :start) }
      else
        puts "Exception caught in Parser thread!"
        close
        raise
      end
    end
  end
#      @pollThread = Thread.new do
#        begin
#        poll
#        rescue
#          puts "Exception caught in Poll thread, dumping backtrace and" +
#            " exiting...\n" + $!.exception + "\n"
#          puts $!.backtrace
#          exit
#        end
#      end
  @status = CONNECTED
end

#stopObject



102
103
104
105
# File 'lib/xmpp4r/stream.rb', line 102

def stop
  @parserThread.kill
  @parser = nil
end

#wait_and_process(time = nil) ⇒ Object

Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.

time
Integer

time to wait in seconds. If nil, wait infinitely.

all available)



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/xmpp4r/stream.rb', line 309

def wait_and_process(time = nil)
  if time == 0 
    return process(1)
  end
  @stanzaqueue_lock.lock
  if @stanzaqueue.size > 0
    e = @stanzaqueue.shift
    @stanzaqueue_lock.unlock
    process_one(e)
    return 1
  end

  @waiting_thread = Thread.current
  @wakeup_thread = Thread.new { sleep time ; @waiting_thread.wakeup if @waiting_thread }
  @waiting_thread.stop
  @wakeup_thread.kill if @wakeup_thread
  @wakeup_thread = nil
  @waiting_thread = nil

  @stanzaqueue_lock.lock
  if @stanzaqueue.size > 0
    e = @stanzaqueue.shift
    @stanzaqueue_lock.unlock
    process_one(e)
    return 1
  end
  return 0
end