Class: UringMachine::IO

Inherits:
Object
  • Object
show all
Defined in:
ext/um/um_io_class.c

Defined Under Namespace

Classes: RESPError

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#UM::Stream.new(machine, fd, mode = nil) ⇒ void #connection(fd, mode = nil) ⇒ void #connection(fd, mode = nil) {|conn| ... } ⇒ void

Initializes a new connection with the given UringMachine instance, target and optional mode. The target maybe a file descriptor, or an instance of OpenSSL::SSL::SSLSocket. In case of an SSL socket, the mode should be :ssl.

Overloads:

  • #connection(fd, mode = nil) {|conn| ... } ⇒ void

    Yields:

    • (conn)

Parameters:

  • machine (UringMachine)

    UringMachine instance

  • target (integer, OpenSSL::SSL::SSLSocket)

    connection target: file descriptor or SSL socket

  • mode (Symbol)

    optional connection mode: :fd, :socket, :ssl



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'ext/um/um_io_class.c', line 134

VALUE IO_initialize(int argc, VALUE *argv, VALUE self) {
  VALUE machine;
  VALUE target;
  VALUE mode;
  rb_scan_args(argc, argv, "21", &machine, &target, &mode);

  struct um_io *conn = um_get_io(self);
  memset(conn, 0, sizeof(struct um_io));

  RB_OBJ_WRITE(self, &conn->self, self);
  conn->machine = um_get_machine(machine);
  io_setup(conn, target, mode);

  return self;
}

Class Method Details

.resp_encode(obj) ⇒ String

Encodes an object into a RESP (Redis protocol) message.

Returns:

  • (String)

Parameters:

  • str (String)

    string buffer

  • obj (any)

    object to be encoded

Returns:

  • (String)

    str



307
308
309
310
311
312
313
314
# File 'ext/um/um_io_class.c', line 307

VALUE IO_resp_encode(VALUE self, VALUE str, VALUE obj) {
  struct um_write_buffer buf;
  write_buffer_init(&buf, str);
  rb_str_modify(str);
  resp_encode(&buf, obj);
  write_buffer_update_len(&buf);
  return str;
}

Instance Method Details

#clearUM::Stream

Clears all available bytes and stops any ongoing read operation.

Returns:

  • (UM::Stream)

    self



359
360
361
362
363
# File 'ext/um/um_io_class.c', line 359

VALUE IO_clear(VALUE self) {
  struct um_io *conn = um_get_io(self);
  io_clear(conn);
  return self;
}

#consumedInteger

Returns the total number of bytes consumed from the conn.

Returns:

  • (Integer)

Returns:

  • (Integer)

    total bytes consumed



335
336
337
338
# File 'ext/um/um_io_class.c', line 335

VALUE IO_consumed(VALUE self) {
  struct um_io *conn = um_get_io(self);
  return LONG2NUM(conn->consumed_bytes);
}

#eof?Boolean

Returns true if connection has reached EOF.

Returns:

  • (Boolean)

Returns:

  • (bool)

    EOF reached



323
324
325
326
# File 'ext/um/um_io_class.c', line 323

VALUE IO_eof_p(VALUE self) {
  struct um_io *conn = um_get_io(self);
  return conn->eof ? Qtrue : Qfalse;
}

#modeSymbol

Returns the connection mode.

Returns:

  • (Symbol)

    connection mode



157
158
159
160
161
162
163
164
165
166
# File 'ext/um/um_io_class.c', line 157

VALUE IO_mode(VALUE self) {
  struct um_io *conn = um_get_io(self);
  switch (conn->mode) {
    case IO_FD:  return SYM_fd;
    case IO_SOCKET:  return SYM_socket;
    case IO_SSL:      return SYM_ssl;
    default:              return Qnil;
  }
  return Qnil;
}

#pendingInteger

Returns the number of bytes available for reading.

Returns:

  • (Integer)

Returns:

  • (Integer)

    bytes available



347
348
349
350
# File 'ext/um/um_io_class.c', line 347

VALUE IO_pending(VALUE self) {
  struct um_io *conn = um_get_io(self);
  return LONG2NUM(conn->pending_bytes);
}

#read(len) ⇒ String

Reads len bytes from the conn. If len is 0, reads all available bytes. If len is negative, reads up to -len available bytes. If len is positive and eof is encountered before len bytes are read, returns nil.

Returns:

  • (String)

Parameters:

  • len (integer)

    number of bytes to read

Returns:

  • (String, nil)

    read data or nil



193
194
195
196
# File 'ext/um/um_io_class.c', line 193

VALUE IO_read(VALUE self, VALUE len) {
  struct um_io *conn = um_get_io(self);
  return io_read(conn, Qnil, NUM2LONG(len), 0, false);
}

#read_each {|data| ... } ⇒ UringMachine::IO

Reads from the target, passing each chunk to the given block.

Yields:

  • (data)

Returns:



239
240
241
242
243
# File 'ext/um/um_io_class.c', line 239

VALUE IO_read_each(VALUE self) {
  struct um_io *conn = um_get_io(self);
  io_read_each(conn);
  return self;
}

#read_line(limit) ⇒ String

Reads from the string until a newline character is encountered. Returns the line without the newline delimiter. If limit is 0, the line length is not limited. If no newline delimiter is found before EOF, returns nil.

Returns:

  • (String)

Parameters:

  • limit (integer)

    maximum line length (0 means no limit)

Returns:

  • (String, nil)

    read data or nil



178
179
180
181
# File 'ext/um/um_io_class.c', line 178

VALUE IO_read_line(VALUE self, VALUE limit) {
  struct um_io *conn = um_get_io(self);
  return io_read_line(conn, Qnil, NUM2ULONG(limit));
}

#read_to_delim(delim, limit) ⇒ String

Reads from the string until a the given delimiter is encountered. Returns the line without the delimiter. If limit is 0, the length is not limited. If a delimiter is not found before EOF and limit is 0 or greater, returns nil.

If no delimiter is found before EOF and limit is negative, returns the buffered data up to EOF or until the absolute-value length limit is reached.

The delim parameter must be a single byte string.

line length (0 means no limit) @return [String, nil] read data or nil

Returns:

  • (String)

Parameters:

  • delim (String)

    delimiter (single byte) @param limit [integer] maximum



213
214
215
216
# File 'ext/um/um_io_class.c', line 213

VALUE IO_read_to_delim(VALUE self, VALUE delim, VALUE limit) {
  struct um_io *conn = um_get_io(self);
  return io_read_to_delim(conn, Qnil, delim, NUM2LONG(limit));
}

#resp_readObject

Decodes an object from a RESP (Redis protocol) message.

Returns:

  • (Object)

Returns:

  • (any)

    decoded object



266
267
268
269
270
271
272
# File 'ext/um/um_io_class.c', line 266

VALUE IO_resp_read(VALUE self) {
  struct um_io *conn = um_get_io(self);
  VALUE out_buffer = rb_utf8_str_new_literal("");
  VALUE obj = resp_read(conn, out_buffer);
  RB_GC_GUARD(out_buffer);
  return obj;
}

#resp_write(obj) ⇒ Integer

Writes the given object using RESP (Redis protocol) to the connection target. Returns the number of bytes written.

Parameters:

  • obj (any)

    object to write

Returns:

  • (Integer)

    total bytes written



283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'ext/um/um_io_class.c', line 283

VALUE IO_resp_write(VALUE self, VALUE obj) {
  struct um_io *conn = um_get_io(self);

  VALUE str = rb_str_new(NULL, 0);
  struct um_write_buffer buf;
  write_buffer_init(&buf, str);
  rb_str_modify(str);
  resp_encode(&buf, obj);
  write_buffer_update_len(&buf);

  size_t len = io_write_raw(conn, buf.ptr, buf.len);
  RB_GC_GUARD(str);
  return ULONG2NUM(len);
}

#skip(len) ⇒ Integer

Skips len bytes in the conn.

Parameters:

  • len (integer)

    number of bytes to skip

Returns:

  • (Integer)

    len



226
227
228
229
230
# File 'ext/um/um_io_class.c', line 226

VALUE IO_skip(VALUE self, VALUE len) {
  struct um_io *conn = um_get_io(self);
  io_skip(conn, NUM2LONG(len), true);
  return len;
}

#write(*bufs) ⇒ Integer

Writes to the connection, ensuring that all data has been written before returning the total number of bytes written.

Parameters:

  • bufs (Array<String, IO::Buffer>)

    data to write

Returns:

  • (Integer)

    total bytes written



254
255
256
257
# File 'ext/um/um_io_class.c', line 254

VALUE IO_write(int argc, VALUE *argv, VALUE self) {
  struct um_io *conn = um_get_io(self);
  return io_writev(conn, argc, argv);
}