Class: EventMachine::StreamObject

Inherits:
Selectable show all
Defined in:
lib/pr_eventmachine.rb

Direct Known Subclasses

EvmaTCPClient

Instance Attribute Summary

Attributes inherited from Selectable

#io, #uuid

Instance Method Summary collapse

Methods inherited from Selectable

#close_scheduled?

Constructor Details

#initialize(io) ⇒ StreamObject

Returns a new instance of StreamObject.



368
369
370
371
# File 'lib/pr_eventmachine.rb', line 368

def initialize io
  super io
  @outbound_q = []
end

Instance Method Details

#eventable_readObject

Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006. If we have it, then we can read multiple times safely to improve performance. TODO, coalesce multiple reads into a single event. TODO, do the function check somewhere else and cache it.



400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/pr_eventmachine.rb', line 400

def eventable_read
  begin
    if io.respond_to?(:read_nonblock)
      10.times {
        data = io.read_nonblock(4096)
        EventMachine::event_callback uuid, ConnectionData, data
      }
    else
      data = io.sysread(4096)
      EventMachine::event_callback uuid, ConnectionData, data
    end
  rescue Errno::EAGAIN
    # no-op
  rescue Errno::ECONNRESET, EOFError
    @close_scheduled = true
    EventMachine::event_callback uuid, ConnectionUnbound, nil
  end

end

#eventable_writeObject

Provisional implementation. Will be re-implemented in subclasses. TODO: Complete this implementation. As it stands, this only writes a single packet per cycle. Highly inefficient, but required unless we’re running on a Ruby with proper nonblocking I/O (Ruby 1.8.4 built from sources from May 25, 2006 or newer). We need to improve the loop so it writes multiple times, however not more than a certain number of bytes per cycle, otherwise one busy connection could hog output buffers and slow down other connections. Also we should coalesce small writes. URGENT TODO: Coalesce small writes. They are a performance killer.



430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# File 'lib/pr_eventmachine.rb', line 430

def eventable_write
  # coalesce the outbound array here, perhaps
  while data = @outbound_q.shift do
    begin
      data = data.to_s
      w = if io.respond_to?(:write_nonblock)
        io.write_nonblock data
      else
        io.syswrite data
      end

      if w < data.length
        $outbound_q.unshift data[w..-1]
        break
      end
    rescue Errno::EAGAIN
      @outbound_q.unshift data
    rescue EOFError, Errno::ECONNRESET
      @close_scheduled = true
      @outbound_q.clear
    end
  end

end

#get_peernameObject

#get_peername This is defined in the normal way on connected stream objects. Return an object that is suitable for passing to Socket#unpack_sockaddr_in or variants. We could also use a convenience method that did the unpacking automatically.



477
478
479
# File 'lib/pr_eventmachine.rb', line 477

def get_peername
  io.getpeername
end

#schedule_close(after_writing) ⇒ Object

#schedule_close The application wants to close the connection.



465
466
467
468
469
470
471
# File 'lib/pr_eventmachine.rb', line 465

def schedule_close after_writing
  if after_writing
    @close_requested = true
  else
    @close_scheduled = true
  end
end

#select_for_reading?Boolean

If we have to close, or a close-after-writing has been requested, then don’t read any more data.

Returns:

  • (Boolean)


375
376
377
# File 'lib/pr_eventmachine.rb', line 375

def select_for_reading?
  true unless (@close_scheduled || @close_requested)
end

#select_for_writing?Boolean

If we have to close, don’t select for writing. Otherwise, see if the protocol is ready to close. If not, see if he has data to send. If a close-after-writing has been requested and the outbound queue is empty, convert the status to close_scheduled.

Returns:

  • (Boolean)


384
385
386
387
388
389
390
391
392
393
# File 'lib/pr_eventmachine.rb', line 384

def select_for_writing?
  unless @close_scheduled
    if @outbound_q.empty?
      @close_scheduled = true if @close_requested
      false
    else
      true
    end
  end
end

#send_data(data) ⇒ Object

#send_data



456
457
458
459
460
461
# File 'lib/pr_eventmachine.rb', line 456

def send_data data
  # TODO, coalesce here perhaps by being smarter about appending to @outbound_q.last?
  unless @close_scheduled or @close_requested or !data or data.length <= 0
    @outbound_q << data.to_s
  end
end