Class: Zstd::StreamingCompress

Inherits:
Object
  • Object
show all
Defined in:
ext/zstdruby/streaming_compress.c

Constant Summary collapse

CONTINUE =
INT2FIX(ZSTD_e_continue)
FLUSH =
INT2FIX(ZSTD_e_flush)
END =
INT2FIX(ZSTD_e_end)

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'ext/zstdruby/streaming_compress.c', line 75

static VALUE
rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
{
  VALUE kwargs;
  rb_scan_args(argc, argv, "00:", &kwargs);

  struct streaming_compress_t* sc;
  TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
  size_t const buffOutSize = ZSTD_CStreamOutSize();

  ZSTD_CCtx* ctx = ZSTD_createCCtx();
  if (ctx == NULL) {
    rb_raise(rb_eRuntimeError, "%s", "ZSTD_createCCtx error");
  }
  set_compress_params(ctx, kwargs);

  sc->ctx = ctx;
  sc->buf = rb_str_new(NULL, buffOutSize);
  sc->buf_size = buffOutSize;
  sc->pending = rb_str_new(0, 0);

  return obj;
}

Instance Method Details

#<<Object

Same as IO.

#compress(src) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'ext/zstdruby/streaming_compress.c', line 123

static VALUE
rb_streaming_compress_compress(VALUE obj, VALUE src)
{
  StringValue(src);
  const char* input_data = RSTRING_PTR(src);
  size_t input_size = RSTRING_LEN(src);
  ZSTD_inBuffer input = { input_data, input_size, 0 };

  struct streaming_compress_t* sc;
  TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);

  VALUE result = rb_str_new(0, 0);
  while (input.pos < input.size) {
    const char* output_data = RSTRING_PTR(sc->buf);
    ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
    size_t const ret = zstd_stream_compress(sc->ctx, &output, &input, ZSTD_e_continue, false);
    if (ZSTD_isError(ret)) {
      rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
    }
    rb_str_cat(result, output.dst, output.pos);
  }
  return result;
}

#finishObject



212
213
214
215
216
217
218
219
220
221
222
# File 'ext/zstdruby/streaming_compress.c', line 212

static VALUE
rb_streaming_compress_finish(VALUE obj)
{
  struct streaming_compress_t* sc;
  TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
  VALUE drained = no_compress(sc, ZSTD_e_end);
  VALUE out = rb_str_dup(sc->pending);
  rb_str_cat(out, RSTRING_PTR(drained), RSTRING_LEN(drained));
  rb_str_resize(sc->pending, 0);
  return out;
}

#flushObject



200
201
202
203
204
205
206
207
208
209
210
# File 'ext/zstdruby/streaming_compress.c', line 200

static VALUE
rb_streaming_compress_flush(VALUE obj)
{
  struct streaming_compress_t* sc;
  TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
  VALUE drained = no_compress(sc, ZSTD_e_flush);
  VALUE out = rb_str_dup(sc->pending);
  rb_str_cat(out, RSTRING_PTR(drained), RSTRING_LEN(drained));
  rb_str_resize(sc->pending, 0);
  return out;
}

Same as IO.

#printfObject

Same as IO.

#putsObject

Same as IO.

#write(*args) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'ext/zstdruby/streaming_compress.c', line 147

static VALUE
rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
{
  size_t total = 0;
  struct streaming_compress_t* sc;
  TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);

  while (argc-- > 0) {
    VALUE str = *argv++;
    StringValue(str);
    const char* input_data = RSTRING_PTR(str);
    size_t input_size = RSTRING_LEN(str);
    ZSTD_inBuffer input = { input_data, input_size, 0 };

    while (input.pos < input.size) {
      const char* output_data = RSTRING_PTR(sc->buf);
      ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
      size_t const ret = zstd_stream_compress(sc->ctx, &output, &input, ZSTD_e_continue, false);
      if (ZSTD_isError(ret)) {
        rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
      }
      /* Directly append to the pending buffer */
      if (output.pos > 0) {
        rb_str_cat(sc->pending, output.dst, output.pos);
      }
    }
    total += RSTRING_LEN(str);
  }

  return SIZET2NUM(total);
}