Class: IO

Inherits:
Object
  • Object
show all
Defined in:
lib/all/io.rb

Overview

Concurrently adds a few methods to IO which make them available for every IO instance.

Instance Method Summary collapse

Instance Method Details

#await_read(maxlen) ⇒ String #await_read(maxlen, outbuf) ⇒ outbuf

Waits until successfully read from IO with blocking other evaluations.

If IO is not readable right now it blocks the current concurrent evaluation and tries again after it became readable.

This method is a shortcut for:

begin
  io.read_nonblock(maxlen, outbuf)
rescue IO::WaitReadable
  io.await_readable
  retry
end

Examples:

r,w = IO.pipe
w.write "Hello!"
r.await_read 1024 # => "Hello!"

Overloads:

  • #await_read(maxlen) ⇒ String

    Reads maxlen bytes from IO and returns it as new string

    Parameters:

    • maxlen (Integer)

    Returns:

    • (String)

      read string

  • #await_read(maxlen, outbuf) ⇒ outbuf

    Reads maxlen bytes from IO and fills the given buffer with them.

    Parameters:

    • maxlen (Integer)
    • outbuf (String)

    Returns:

    • (outbuf)

      outbuf filled with read string

See Also:

Since:

  • 1.1.0



123
124
125
126
127
128
# File 'lib/all/io.rb', line 123

def await_read(maxlen, outbuf = nil)
  read_nonblock(maxlen, outbuf)
rescue IO::WaitReadable
  await_readable
  retry
end

#await_readable(opts = {}) ⇒ true

Suspends the current evaluation until IO is readable.

While waiting, the code jumps to the event loop and executes other evaluations that are ready to run in the meantime.

Examples:

Waiting inside a concurrent evaluation

# Control flow is indicated by (N)

r,w = IO.pipe

# (1)
reader = concurrently do
  # (4)
  r.await_readable
  # (6)
  r.read
end

# (2)
concurrently do
  # (5)
  w.write 'Hey from the other proc!'
  w.close
end

# (3)
reader.await_result # => 'Hey from the other proc!'
# (7)

r.close

Waiting outside a concurrent evaluation

# Control flow is indicated by (N)

r,w = IO.pipe

# (1)
concurrently do
  # (3)
  puts "I'm running while the outside is waiting!"
  w.write "Continue!"
  w.close
end

# (2)
r.await_readable
# (4)
r.read # => "Continue!"

r.close

Waiting with a timeout

r,w = IO.pipe
r.await_readable(within: 1)
# => raises a TimeoutError after 1 second

Waiting with a timeout and a timeout result

r,w = IO.pipe
r.await_readable(within: 0.1, timeout_result: false)
# => returns false after 0.1 seconds

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :within (Numeric)

    maximum time to wait (defaults to: Float::INFINITY)

  • :timeout_result (Object)

    result to return in case of an exceeded waiting time (defaults to raising Concurrently::Evaluation::TimeoutError)

Returns:

  • (true)

Raises:

Since:

  • 1.0.0



77
78
79
80
81
82
83
# File 'lib/all/io.rb', line 77

def await_readable(opts = {})
  io_selector = Concurrently::EventLoop.current.io_selector
  io_selector.await_reader(self, Concurrently::Evaluation.current)
  await_resume! opts
ensure
  io_selector.cancel_reader(self)
end

#await_writable(opts = {}) ⇒ true

Suspends the current evaluation until IO is writable.

While waiting, the code jumps to the event loop and executes other evaluations that are ready to run in the meantime.

Examples:

Waiting inside a evaluation

# Control flow is indicated by (N)

r,w = IO.pipe

# jam the pipe with x's, assuming the pipe's max capacity is 2^16 bytes
w.write 'x'*65536

# (1)
writer = concurrently do
  # (4)
  w.await_writable
  # (6)
  w.write 'I can write again!'
  :written
end

# (2)
concurrently do
  # (5)
  r.read 65536 # clear the pipe
end

# (3)
writer.await_result # => :written
# (7)

r.close; w.close

Waiting outside a evaluation

# Control flow is indicated by (N)

r,w = IO.pipe

# jam the pipe with x's, assuming the pipe's max capacity is 2^16 bytes
w.write 'x'*65536

# (1)
concurrently do
  # (3)
  puts "I'm running while the outside is waiting!"
  r.read 65536 # clear the pipe
end

# (2)
w.await_writable
# (4)

r.close; w.close

Waiting with a timeout

r,w = IO.pipe
# jam the pipe with x's, assuming the pipe's max capacity is 2^16 bytes
w.write 'x'*65536

w.await_writable(within: 1)
# => raises a TimeoutError after 1 second

Waiting with a timeout and a timeout result

r,w = IO.pipe
# jam the pipe with x's, assuming the pipe's max capacity is 2^16 bytes
w.write 'x'*65536

w.await_writable(within: 0.1, timeout_result: false)
# => returns false after 0.1 seconds

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :within (Numeric)

    maximum time to wait (defaults to: Float::INFINITY)

  • :timeout_result (Object)

    result to return in case of an exceeded waiting time (defaults to raising Concurrently::Evaluation::TimeoutError)

Returns:

  • (true)

Raises:

Since:

  • 1.0.0



252
253
254
255
256
257
258
# File 'lib/all/io.rb', line 252

def await_writable(opts = {})
  io_selector = Concurrently::EventLoop.current.io_selector
  io_selector.await_writer(self, Concurrently::Evaluation.current)
  await_resume! opts
ensure
  io_selector.cancel_writer(self)
end

#await_written(string) ⇒ Integer

Waits until successfully written to IO with blocking other evaluations.

If IO is not writable right now it blocks the current evaluation and tries again after it became writable.

This methods is a shortcut for:

begin
  io.write_nonblock(string)
rescue IO::WaitWritable
  io.await_writable
  retry
end

Examples:

r,w = IO.pipe
w.await_written "Hello!"
r.read 1024 # => "Hello!"

Parameters:

  • string (String)

    to write

Returns:

  • (Integer)

    bytes written

See Also:

Since:

  • 1.1.0



288
289
290
291
292
293
# File 'lib/all/io.rb', line 288

def await_written(string)
  write_nonblock(string)
rescue IO::WaitWritable
  await_writable
  retry
end

#concurrently_read(maxlen) ⇒ String #concurrently_read(maxlen, outbuf) ⇒ outbuf

Reads from IO concurrently.

Reading is done in a concurrent evaluation in the background.

This method is a shortcut for:

concurrently{ io.await_read(maxlen, outbuf) }

Examples:

r,w = IO.pipe
w.write "Hello!"
r.concurrently_read 1024 # => "Hello!"

Overloads:

  • #concurrently_read(maxlen) ⇒ String

    Reads maxlen bytes from IO and returns it as new string

    Parameters:

    • maxlen (Integer)

    Returns:

    • (String)

      read string

  • #concurrently_read(maxlen, outbuf) ⇒ outbuf

    Reads maxlen bytes from IO and fills the given buffer with them.

    Parameters:

    • maxlen (Integer)
    • outbuf (String)

    Returns:

    • (outbuf)

      outbuf filled with read string

See Also:

Since:

  • 1.0.0



162
163
164
# File 'lib/all/io.rb', line 162

def concurrently_read(maxlen, outbuf = nil)
  READ_PROC.call_detached(self, maxlen, outbuf)
end

#concurrently_write(string) ⇒ Integer

Writes to IO concurrently.

Writing is done in a concurrent evaluation in the background.

This method is a shortcut for:

concurrently{ io.await_written(string) }

Examples:

r,w = IO.pipe
w.concurrently_write "Hello!"
r.read 1024 # => "Hello!"

Parameters:

  • string (String)

    to write

Returns:

  • (Integer)

    bytes written

See Also:

Since:

  • 1.0.0



317
318
319
# File 'lib/all/io.rb', line 317

def concurrently_write(string)
  WRITE_PROC.call_detached(self, string)
end