Class: IoUnblock::Stream
- Inherits:
-
Object
- Object
- IoUnblock::Stream
- Defined in:
- lib/io_unblock/stream.rb
Constant Summary collapse
- MAX_BYTES_PER_WRITE =
1024 * 8
- MAX_BYTES_PER_READ =
1024 * 4
Instance Attribute Summary collapse
-
#callbacks ⇒ Object
readonly
Returns the value of attribute callbacks.
-
#connected ⇒ Object
(also: #connected?)
readonly
Returns the value of attribute connected.
-
#io ⇒ Object
readonly
Returns the value of attribute io.
-
#io_selector ⇒ Object
readonly
Returns the value of attribute io_selector.
-
#running ⇒ Object
(also: #running?)
readonly
Returns the value of attribute running.
-
#select_delay ⇒ Object
Returns the value of attribute select_delay.
Instance Method Summary collapse
- #alive? ⇒ Boolean
-
#initialize(io, callbacks = nil) {|_self| ... } ⇒ Stream
constructor
The given IO object,
io, is assumed to be opened/connected. - #start(&cb) ⇒ Object
- #stop ⇒ Object
-
#write(bytes, *cb_args, &cb) ⇒ Object
The callback triggered here will be invoked only when all bytes have been written.
Constructor Details
#initialize(io, callbacks = nil) {|_self| ... } ⇒ Stream
The given IO object, io, is assumed to be opened/connected. Global callbacks:
-
failed: called when IO access throws an exception that cannot
be recovered from (opening the IO fails, TCP connection reset, unexpected EOF, etc.) -
read: called when any data is read from the underlying IO
object -
wrote: called when any data is written to the underlying IO
object -
closed: called when the underlying IO object is closed (even
if it is closed as a result of a failure) -
started: called when the IO processing has started
-
stopped: called when the IO processing has stopped
-
looped: called when each time the IO processing loops
-
callback_failed: called when an exception is raised within a callback
28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/io_unblock/stream.rb', line 28 def initialize io, callbacks=nil @io = io @io_selector = [@io] @processor = nil @s_mutex = Mutex.new @w_buff = IoUnblock::Buffer.new @running = false @connected = true @callbacks = callbacks || {} @select_delay = 0.1 Delegation.define_io_methods self yield self if block_given? end |
Instance Attribute Details
#callbacks ⇒ Object (readonly)
Returns the value of attribute callbacks.
8 9 10 |
# File 'lib/io_unblock/stream.rb', line 8 def callbacks @callbacks end |
#connected ⇒ Object (readonly) Also known as: connected?
Returns the value of attribute connected.
8 9 10 |
# File 'lib/io_unblock/stream.rb', line 8 def connected @connected end |
#io ⇒ Object (readonly)
Returns the value of attribute io.
8 9 10 |
# File 'lib/io_unblock/stream.rb', line 8 def io @io end |
#io_selector ⇒ Object (readonly)
Returns the value of attribute io_selector.
8 9 10 |
# File 'lib/io_unblock/stream.rb', line 8 def io_selector @io_selector end |
#running ⇒ Object (readonly) Also known as: running?
Returns the value of attribute running.
8 9 10 |
# File 'lib/io_unblock/stream.rb', line 8 def running @running end |
#select_delay ⇒ Object
Returns the value of attribute select_delay.
9 10 11 |
# File 'lib/io_unblock/stream.rb', line 9 def select_delay @select_delay end |
Instance Method Details
#alive? ⇒ Boolean
72 73 74 |
# File 'lib/io_unblock/stream.rb', line 72 def alive? @processor && @processor.alive? end |
#start(&cb) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/io_unblock/stream.rb', line 42 def start &cb @s_mutex.synchronize do raise StreamError, "already started" if @running @running = true @processor = Thread.new do trigger_callbacks :started, :start, &cb read_and_write while running? && connected? flush_and_close trigger_callbacks :stopped, :stop, &cb end end self end |
#stop ⇒ Object
56 57 58 59 60 61 62 63 |
# File 'lib/io_unblock/stream.rb', line 56 def stop if @processor == Thread.current stop_inside else stop_outside end self end |
#write(bytes, *cb_args, &cb) ⇒ Object
The callback triggered here will be invoked only when all bytes have been written.
67 68 69 70 |
# File 'lib/io_unblock/stream.rb', line 67 def write bytes, *cb_args, &cb @w_buff.push bytes, cb, cb_args self end |