Class: IoUnblock::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/io_unblock/stream.rb

Constant Summary collapse

MAX_BYTES_PER_WRITE =
1024 * 8
MAX_BYTES_PER_READ =
1024 * 4

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, callbacks = nil) {|_self| ... } ⇒ Stream

The given IO object, io, is assumed to be opened/connected. Global callbacks:

  • failed: called when IO access throws an exception that cannot

    be recovered from (opening the IO fails,
    TCP connection reset, unexpected EOF, etc.)
    
  • read: called when any data is read from the underlying IO

    object
    
  • wrote: called when any data is written to the underlying IO

    object
    
  • closed: called when the underlying IO object is closed (even

    if it is closed as a result of a failure)
    
  • started: called when the IO processing has started

  • stopped: called when the IO processing has stopped

  • looped: called when each time the IO processing loops

  • callback_failed: called when an exception is raised within a callback

Yields:

  • (_self)

Yield Parameters:



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/io_unblock/stream.rb', line 28

def initialize io, callbacks=nil
  @io = io
  @io_selector = [@io]
  @processor = nil
  @s_mutex = Mutex.new
  @w_buff = IoUnblock::Buffer.new
  @running = false
  @connected = true
  @callbacks = callbacks || {}
  @select_delay = 0.1
  Delegation.define_io_methods self
  yield self if block_given?
end

Instance Attribute Details

#callbacksObject (readonly)

Returns the value of attribute callbacks.



8
9
10
# File 'lib/io_unblock/stream.rb', line 8

def callbacks
  @callbacks
end

#connectedObject (readonly) Also known as: connected?

Returns the value of attribute connected.



8
9
10
# File 'lib/io_unblock/stream.rb', line 8

def connected
  @connected
end

#ioObject (readonly)

Returns the value of attribute io.



8
9
10
# File 'lib/io_unblock/stream.rb', line 8

def io
  @io
end

#io_selectorObject (readonly)

Returns the value of attribute io_selector.



8
9
10
# File 'lib/io_unblock/stream.rb', line 8

def io_selector
  @io_selector
end

#runningObject (readonly) Also known as: running?

Returns the value of attribute running.



8
9
10
# File 'lib/io_unblock/stream.rb', line 8

def running
  @running
end

#select_delayObject

Returns the value of attribute select_delay.



9
10
11
# File 'lib/io_unblock/stream.rb', line 9

def select_delay
  @select_delay
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/io_unblock/stream.rb', line 72

def alive?
  @processor && @processor.alive?
end

#start(&cb) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/io_unblock/stream.rb', line 42

def start &cb
  @s_mutex.synchronize do
    raise StreamError, "already started" if @running
    @running = true
    @processor = Thread.new do
      trigger_callbacks :started, :start, &cb
      read_and_write while running? && connected?
      flush_and_close
      trigger_callbacks :stopped, :stop, &cb
    end
  end
  self
end

#stopObject



56
57
58
59
60
61
62
63
# File 'lib/io_unblock/stream.rb', line 56

def stop
  if @processor == Thread.current
    stop_inside
  else
    stop_outside
  end
  self
end

#write(bytes, *cb_args, &cb) ⇒ Object

The callback triggered here will be invoked only when all bytes have been written.



67
68
69
70
# File 'lib/io_unblock/stream.rb', line 67

def write bytes, *cb_args, &cb
  @w_buff.push bytes, cb, cb_args
  self
end