Class: IO

Inherits:
Object
  • Object
show all
Defined in:
lib/all/io.rb

Overview

Concurrently adds a few methods to IO which make them available for every IO instance.

Since:

  • 1.0.0

Instance Method Summary collapse

Instance Method Details

#await_readable(opts = {}) ⇒ true

Suspends the current evaluation until IO is readable. It can be used inside and outside of concurrent procs.

While waiting, the code jumps to the event loop and executes other concurrent procs that are ready to run in the meantime.

Examples:

Waiting inside a concurrent proc

# Control flow is indicated by (N)

r,w = IO.pipe

# (1)
wait_proc = concurrent_proc do
   # (4)
   r.await_readable
   # (6)
   r.read
end

# (2)
concurrently do
  # (5)
  w.write 'Hey from the other proc!'
  w.close
end

# (3)
wait_proc.call # => 'Hey from the other proc!'
# (7)

r.close

Waiting outside a concurrent proc

# Control flow is indicated by (N)

r,w = IO.pipe

# (1)
concurrently do
  # (3)
  puts "I'm running while the outside is waiting!"
  w.write "Continue!"
  w.close
end

# (2)
r.await_readable
# (4)
r.read # => "Continue!"

r.close

Waiting with a timeout

r,w = IO.pipe
r.await_readable(within: 1)
# => raises a TimeoutError after 1 second

Waiting with a timeout and a timeout result

r,w = IO.pipe
r.await_readable(within: 0.1, timeout_result: false)
# => returns false after 0.1 second

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :within (Numeric)

    maximum time to wait (defaults to: Float::INFINITY)

  • :timeout_result (Object)

    result to return in case of an exceeded waiting time (defaults to raising Concurrently::Evaluation::TimeoutError)

Returns:

  • (true)

Raises:

Since:

  • 1.0.0



77
78
79
80
81
82
83
# File 'lib/all/io.rb', line 77

def await_readable(opts = {})
  io_selector = Concurrently::EventLoop.current.io_selector
  io_selector.await_reader(self, Concurrently::Evaluation.current)
  await_resume! opts
ensure
  io_selector.cancel_reader(self)
end

#await_writable(opts = {}) ⇒ true

Suspends the current evaluation until IO is writable. It can be used inside and outside of concurrent procs.

While waiting, the code jumps to the event loop and executes other concurrent procs that are ready to run in the meantime.

Examples:

Waiting inside a concurrent proc

# Control flow is indicated by (N)

r,w = IO.pipe

# jam the pipe with x's, assuming the pipe's max capacity is 2^16 bytes
w.write 'x'*65536

# (1)
wait_proc = concurrent_proc do
   # (4)
   w.await_writable
   # (6)
   w.write 'I can write again!'
   :written
end

# (2)
concurrently do
  # (5)
  r.read 65536 # clear the pipe
end

# (3)
wait_proc.call # => :written
# (7)

r.close; w.close

Waiting outside a concurrent proc

# Control flow is indicated by (N)

r,w = IO.pipe

# jam the pipe with x's, assuming the pipe's max capacity is 2^16 bytes
w.write 'x'*65536

# (1)
concurrently do
  # (3)
  puts "I'm running while the outside is waiting!"
  r.read 65536 # clear the pipe
end

# (2)
w.await_writable
# (4)

r.close; w.close

Waiting with a timeout

r,w = IO.pipe
# jam the pipe with x's, assuming the pipe's max capacity is 2^16 bytes
w.write 'x'*65536

w.await_writable(within: 1)
# => raises a TimeoutError after 1 second

Waiting with a timeout and a timeout result

r,w = IO.pipe
# jam the pipe with x's, assuming the pipe's max capacity is 2^16 bytes
w.write 'x'*65536

w.await_writable(within: 0.1, timeout_result: false)
# => returns false after 0.1 second

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :within (Numeric)

    maximum time to wait (defaults to: Float::INFINITY)

  • :timeout_result (Object)

    result to return in case of an exceeded waiting time (defaults to raising Concurrently::Evaluation::TimeoutError)

Returns:

  • (true)

Raises:

Since:

  • 1.0.0



208
209
210
211
212
213
214
# File 'lib/all/io.rb', line 208

def await_writable(opts = {})
  io_selector = Concurrently::EventLoop.current.io_selector
  io_selector.await_writer(self, Concurrently::Evaluation.current)
  await_resume! opts
ensure
  io_selector.cancel_writer(self)
end

#concurrently_read(maxlen) ⇒ String #concurrently_read(maxlen, outbuf) ⇒ outbuf

Reads from IO concurrently.

If IO is not readable right now it blocks the current concurrent evaluation and tries again after it became readable.

This method is a shortcut for:

begin
  read_nonblock(maxlen, buf)
rescue IO::WaitReadable
  await_readable
  retry
end

Examples:

r,w = IO.pipe
w.concurrently_write "Hello!"
r.concurrently_read 1024 # => "Hello!"

Overloads:

  • #concurrently_read(maxlen) ⇒ String

    Reads maxlen bytes from IO and returns it as new string

    Parameters:

    • maxlen (Integer)

    Returns:

    • (String)

      read string

  • #concurrently_read(maxlen, outbuf) ⇒ outbuf

    Reads maxlen bytes from IO and fills the given buffer with them.

    Parameters:

    • maxlen (Integer)
    • outbuf (String)

    Returns:

    • (outbuf)

      outbuf filled with read string

See Also:

Since:

  • 1.0.0



121
122
123
124
125
126
# File 'lib/all/io.rb', line 121

def concurrently_read(maxlen, outbuf = nil)
  read_nonblock(maxlen, outbuf)
rescue IO::WaitReadable
  await_readable
  retry
end

#concurrently_write(string) ⇒ Integer

Writes to IO concurrently.

If IO is not writable right now it blocks the current concurrent proc and tries again after it became writable.

This methods is a shortcut for:

begin
  write_nonblock(string)
rescue IO::WaitWritable
  await_writable
  retry
end

Examples:

r,w = IO.pipe
w.concurrently_write "Hello!"
r.concurrently_read 1024 # => "Hello!"

Parameters:

  • string (String)

    to write

Returns:

  • (Integer)

    bytes written

See Also:

Since:

  • 1.0.0



242
243
244
245
246
247
# File 'lib/all/io.rb', line 242

def concurrently_write(string)
  write_nonblock(string)
rescue IO::WaitWritable
  await_writable
  retry
end