Class: IO

Inherits:
Object show all
Defined in:
lib/polyphony/extensions/io.rb,
lib/polyphony/extensions/io.rb

Overview

IO instance method patches

Direct Known Subclasses

BasicSocket

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.copy_stream(src, dst, src_length = nil, src_offset = 0) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/polyphony/extensions/io.rb', line 90

def copy_stream(src, dst, src_length = nil, src_offset = 0)
  close_src = false
  close_dst = false
  if !src.respond_to?(:readpartial)
    src = File.open(src, 'r+')
    close_src = true
  end
  if !dst.respond_to?(:readpartial)
    dst = File.open(dst, 'w+')
    close_dst = true
  end
  src.seek(src_offset) if src_offset > 0

  pipe = Polyphony::Pipe.new

  pipe_to_dst = spin { dst.splice_from(pipe, -65536) }

  count = 0
  if src_length
    while count < src_length
      count += pipe.splice_from(src, src_length)
    end
  else
    count = pipe.splice_from(src, -65536)
  end
  
  pipe.close
  pipe_to_dst.await
  
  count
ensure
  pipe_to_dst&.stop
  src.close if close_src
  dst.close if close_dst
end

.deflate(src, dest) ⇒ Integer .deflate(src, dest, opt) ⇒ Integer

Deflates data from the source IO to the destination IO, returning the number bytes written to the destination IO.

Overloads:



619
620
621
622
623
624
625
626
627
628
629
630
631
632
# File 'ext/polyphony/io_extensions.c', line 619

VALUE IO_deflate(VALUE self, VALUE src, VALUE dest) {
  struct z_stream_ctx ctx;
  int level = DEFAULT_LEVEL;
  int ret;

  setup_ctx(&ctx, SM_DEFLATE, src, dest);
  ret = deflateInit(&ctx.strm, level);
  if (ret != Z_OK)
    rb_raise(rb_eRuntimeError, "zlib error: %s\n", ctx.strm.msg);

  Z_STREAM_SAFE_IO_LOOP_WITH_CLEANUP(ctx);

  return INT2FIX(ctx.out_total);
}

.double_splice(src, dest) ⇒ Integer

Creates a pipe and splices data between the two given IOs, using the pipe, splicing until EOF.

Parameters:

Returns:

  • (Integer)

    total bytes spliced



144
145
146
# File 'lib/polyphony/extensions/io.rb', line 144

def double_splice(src, dest)
  Polyphony.backend_double_splice(src, dest)
end

.gunzip(src, dest) ⇒ Integer .gunzip(src, dest, opt) ⇒ Integer

Gunzips data from the source IO to the destination IO, returning the number bytes written to the destination IO.

Overloads:



569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
# File 'ext/polyphony/io_extensions.c', line 569

VALUE IO_gunzip(int argc, VALUE *argv, VALUE self) {
  VALUE src;
  VALUE dest;
  VALUE info = Qnil;

  rb_scan_args(argc, argv, "21", &src, &dest, &info);

  struct gzip_header_ctx header_ctx;
  // struct gzip_footer_ctx footer_ctx;
  struct z_stream_ctx ctx;
  int ret;

  setup_ctx(&ctx, SM_INFLATE, src, dest);
  gzip_read_header(&ctx, &header_ctx);

  ret = inflateInit2(&ctx.strm, -MAX_WBITS);
  if (ret != Z_OK)
    rb_raise(rb_eRuntimeError, "zlib error: %s\n", ctx.strm.msg);

  Z_STREAM_SAFE_IO_LOOP_WITH_CLEANUP(ctx);

  // gzip_read_footer(&ctx, &footer_ctx);
  // TODO: verify crc32
  // TODO: verify total length

  if (info != Qnil) {
    rb_hash_aset(info, SYM_mtime, FIX2TIME(header_ctx.mtime));
    rb_hash_aset(info, SYM_orig_name, header_ctx.orig_name);
    rb_hash_aset(info, SYM_comment, header_ctx.comment);
  }
  RB_GC_GUARD(header_ctx.orig_name);
  RB_GC_GUARD(header_ctx.comment);

  return INT2FIX(ctx.out_total);
}

.gzip(src, dest) ⇒ Integer .gzip(src, dest, opt) ⇒ Integer

Gzips data from the source IO to the destination IO, returning the number bytes written to the destination IO.

Overloads:



524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
# File 'ext/polyphony/io_extensions.c', line 524

VALUE IO_gzip(int argc, VALUE *argv, VALUE self) {
  VALUE src;
  VALUE dest;
  VALUE opts = Qnil;
  int opts_present;

  rb_scan_args(argc, argv, "21", &src, &dest, &opts);
  opts_present = opts != Qnil;

  struct gzip_header_ctx header_ctx = {
    opts_present ? rb_hash_aref(opts, SYM_mtime) : Qnil,
    opts_present ? rb_hash_aref(opts, SYM_orig_name) : Qnil,
    opts_present ? rb_hash_aref(opts, SYM_comment) : Qnil
  };

  struct z_stream_ctx ctx;
  int ret;

  setup_ctx(&ctx, SM_DEFLATE, src, dest);
  ctx.f_gzip_footer = 1; // write gzip footer
  ctx.out_total = ctx.out_pos = gzip_prepare_header(&header_ctx, ctx.out, sizeof(ctx.out));

  ret = deflateInit2(&ctx.strm, DEFAULT_LEVEL, Z_DEFLATED, -MAX_WBITS, DEFAULT_MEM_LEVEL, Z_DEFAULT_STRATEGY);
  if (ret != Z_OK)
    rb_raise(rb_eRuntimeError, "zlib error: %s\n", ctx.strm.msg);
  Z_STREAM_SAFE_IO_LOOP_WITH_CLEANUP(ctx);
  return INT2FIX(ctx.out_total);
}

.http1_splice_chunked(src, dest, maxlen) ⇒ Integer

Splices data from the source IO to the destination IO, writing it in HTTP1 chunked encoding. A pipe is automatically created to buffer data between source and destination.

Parameters:

  • src (IO)

    source

  • dest (IO)

    destination

  • maxlen (Integer)

    maximum bytes to splice

Returns:

  • (Integer)

    bytes spliced



672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
# File 'ext/polyphony/io_extensions.c', line 672

VALUE IO_http1_splice_chunked(VALUE self, VALUE src, VALUE dest, VALUE maxlen) {
  enum write_method method = detect_write_method(dest);
  VALUE backend = BACKEND();
  VALUE pipe = rb_funcall(cPipe, ID_new, 0);
  unsigned char out[128];
  struct buffer_spec buffer_spec = { out, 0 };

  while (1) {
    int len = FIX2INT(Backend_splice(backend, src, pipe, maxlen));
    if (!len) break;

    // write chunk header
    buffer_spec.len += sprintf((char *)buffer_spec.ptr + buffer_spec.len, "%x\r\n", len);
    write_from_raw_buffer(backend, dest, method, &buffer_spec);
    buffer_spec.len = 0;
    while (len) {
      int spliced = FIX2INT(Backend_splice(backend, pipe, dest, INT2FIX(len)));
      len -= spliced;
    }
    buffer_spec.len += sprintf((char *)buffer_spec.ptr + buffer_spec.len, "\r\n");
  }
  buffer_spec.len += sprintf((char *)buffer_spec.ptr + buffer_spec.len, "0\r\n\r\n");
  write_from_raw_buffer(backend, dest, method, &buffer_spec);

  Pipe_close(pipe);
  RB_GC_GUARD(pipe);

  return self;
}

.inflate(src, dest) ⇒ Integer .inflate(src, dest, opt) ⇒ Integer

Inflates data from the source IO to the destination IO, returning the number bytes written to the destination IO.

Overloads:



648
649
650
651
652
653
654
655
656
657
658
659
660
# File 'ext/polyphony/io_extensions.c', line 648

VALUE IO_inflate(VALUE self, VALUE src, VALUE dest) {
  struct z_stream_ctx ctx;
  int ret;

  setup_ctx(&ctx, SM_INFLATE, src, dest);
  ret = inflateInit(&ctx.strm);
  if (ret != Z_OK)
    rb_raise(rb_eRuntimeError, "zlib error: %s\n", ctx.strm.msg);

  Z_STREAM_SAFE_IO_LOOP_WITH_CLEANUP(ctx);

  return INT2FIX(ctx.out_total);
}

.orig_readlinesObject



62
# File 'lib/polyphony/extensions/io.rb', line 62

alias_method :orig_readlines, :readlines

.readlines(name, sep = $/, limit = nil, getline_args = EMPTY_HASH) ⇒ Object



63
64
65
66
67
# File 'lib/polyphony/extensions/io.rb', line 63

def readlines(name, sep = $/, limit = nil, getline_args = EMPTY_HASH)
  File.open(name, 'r') do |f|
    f.readlines(sep, **getline_args)
  end
end

.splice(src, dest, maxlen) ⇒ Integer

Splices from one IO to another IO. At least one of the IOs must be a pipe. If maxlen is negative, splices repeatedly using absolute value of maxlen until EOF is encountered.

Parameters:

Returns:

  • (Integer)

    bytes spliced



134
135
136
# File 'lib/polyphony/extensions/io.rb', line 134

def splice(src, dest, maxlen)
  Polyphony.backend_splice(src, dest, maxlen)
end

.tee(src, dest, maxlen) ⇒ Integer

Tees data from the source to the desination.

Parameters:

Returns:

  • (Integer)

    total bytes teed



165
166
167
# File 'lib/polyphony/extensions/io.rb', line 165

def tee(src, dest, maxlen)
  Polyphony.backend_tee(src, dest, maxlen)
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes the IO instance



478
479
480
481
482
483
# File 'lib/polyphony/extensions/io.rb', line 478

def close
  return if closed?

  Polyphony.backend_close(self) rescue nil
  nil
end

#double_splice(src, dest) ⇒ Object



149
150
151
152
153
154
155
156
# File 'lib/polyphony/extensions/io.rb', line 149

def double_splice(src, dest)
  pipe = Polyphony::Pipe.new
  f = spin { Polyphony.backend_splice(pipe, dest, -65536) }
  Polyphony.backend_splice(src, pipe, -65536)
  pipe.close
ensure
  f.stop
end

#feed_loop(receiver, method = :call, &block) ⇒ IO

Receives data from the io in an infinite loop, passing the data to the given receiver using the given method. If a block is given, the result of the method call to the receiver is passed to the block.

This method can be used to feed data into parser objects. The following example shows how to feed data from a io directly into a MessagePack unpacker:

unpacker = MessagePack::Unpacker.new io.feed_loop(unpacker, :feed_each) { |msg| handle_msg(msg) }

Parameters:

  • receiver (any)

    receiver object

  • method (Symbol) (defaults to: :call)

    method to call

Returns:

  • (IO)

    self



424
425
426
# File 'lib/polyphony/extensions/io.rb', line 424

def feed_loop(receiver, method = :call, &block)
  Polyphony.backend_feed_loop(self, receiver, method, &block)
end

#read_loop(maxlen = 8192) {|String| ... } ⇒ IO

Reads up to maxlen bytes at a time in an infinite loop. Read data will be passed to the given block.

Parameters:

  • maxlen (Integer) (defaults to: 8192)

    maximum bytes to receive

Yields:

  • (String)

    read data

Returns:

  • (IO)

    self



406
407
408
# File 'lib/polyphony/extensions/io.rb', line 406

def read_loop(maxlen = 8192, &block)
  Polyphony.backend_read_loop(self, maxlen, &block)
end

#splice_from(src, maxlen) ⇒ Integer

Splices data from the given IO. If maxlen is negative, splices repeatedly using absolute value of maxlen until EOF is encountered.

Parameters:

  • src (IO, Polpyhony::Pipe)

    source to splice from

  • maxlen (Integer)

    maximum bytes to splice

Returns:

  • (Integer)

    bytes spliced



468
469
470
# File 'lib/polyphony/extensions/io.rb', line 468

def splice_from(src, maxlen)
  Polyphony.backend_splice(src, self, maxlen)
end

#tee_from(src, maxlen) ⇒ Integer

Tees data from the given IO.

Parameters:

  • src (IO, Polpyhony::Pipe)

    source to tee from

  • maxlen (Integer)

    maximum bytes to tee

Returns:

  • (Integer)

    bytes teed



491
492
493
# File 'lib/polyphony/extensions/io.rb', line 491

def tee_from(src, maxlen)
  Polyphony.backend_tee(src, self, maxlen)
end

#wait_readable(timeout = nil) ⇒ IO

Waits for the IO to become readable, with an optional timeout.

Parameters:

  • timeout (Integer, nil) (defaults to: nil)

    optional timeout in seconds.

Returns:

  • (IO)

    self



432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/polyphony/extensions/io.rb', line 432

def wait_readable(timeout = nil)
  return self if @read_buffer && !@read_buffer.empty?

  if timeout
    move_on_after(timeout) do
      Polyphony.backend_wait_io(self, false)
      self
    end
  else
    Polyphony.backend_wait_io(self, false)
    self
  end
end

#wait_writable(timeout = nil) ⇒ IO

Waits for the IO to become writeable, with an optional timeout.

Parameters:

  • timeout (Integer, nil) (defaults to: nil)

    optional timeout in seconds.

Returns:

  • (IO)

    self



450
451
452
453
454
455
456
457
458
459
460
# File 'lib/polyphony/extensions/io.rb', line 450

def wait_writable(timeout = nil)
  if timeout
    move_on_after(timeout) do
      Polyphony.backend_wait_io(self, true)
      self
    end
  else
    Polyphony.backend_wait_io(self, true)
    self
  end
end