Class: VibeZstd::CompressWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/vibe_zstd.rb,
ext/vibe_zstd/vibe_zstd.c

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.open(io, **options) ⇒ Object

Block-based resource management Automatically calls finish when block completes



167
168
169
170
171
172
173
174
175
176
# File 'lib/vibe_zstd.rb', line 167

def self.open(io, **options)
  writer = new(io, **options)
  return writer unless block_given?

  begin
    yield writer
  ensure
    writer.finish
  end
end

Instance Method Details

#closeObject

alias



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'ext/vibe_zstd/streaming.c', line 172

static VALUE
vibe_zstd_writer_finish(VALUE self) {
    vibe_zstd_cstream* cstream;
    TypedData_Get_Struct(self, vibe_zstd_cstream, &vibe_zstd_cstream_type, cstream);

    size_t outBufferSize = ZSTD_CStreamOutSize();
    VALUE outBuffer = rb_str_buf_new(outBufferSize);

    ZSTD_inBuffer input = { NULL, 0, 0 };
    size_t remaining;

    // ZSTD_e_end: finalize frame with checksum and epilogue
    // Loop until remaining == 0 (frame complete)
    do {
        ZSTD_outBuffer output = {
            .dst = RSTRING_PTR(outBuffer),
            .size = outBufferSize,
            .pos = 0
        };

        // Return value > 0 means more epilogue data to write
        remaining = ZSTD_compressStream2((ZSTD_CCtx*)cstream->cstream, &output, &input, ZSTD_e_end);
        if (ZSTD_isError(remaining)) {
            rb_raise(rb_eRuntimeError, "Finish failed: %s", ZSTD_getErrorName(remaining));
        }

        if (output.pos > 0) {
            rb_str_set_len(outBuffer, output.pos);
            rb_funcall(cstream->io, rb_intern("write"), 1, outBuffer);
            // No need to resize - buffer capacity remains at outBufferSize
        }
    } while (remaining > 0);

    return self;
}

#finishObject



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'ext/vibe_zstd/streaming.c', line 172

static VALUE
vibe_zstd_writer_finish(VALUE self) {
    vibe_zstd_cstream* cstream;
    TypedData_Get_Struct(self, vibe_zstd_cstream, &vibe_zstd_cstream_type, cstream);

    size_t outBufferSize = ZSTD_CStreamOutSize();
    VALUE outBuffer = rb_str_buf_new(outBufferSize);

    ZSTD_inBuffer input = { NULL, 0, 0 };
    size_t remaining;

    // ZSTD_e_end: finalize frame with checksum and epilogue
    // Loop until remaining == 0 (frame complete)
    do {
        ZSTD_outBuffer output = {
            .dst = RSTRING_PTR(outBuffer),
            .size = outBufferSize,
            .pos = 0
        };

        // Return value > 0 means more epilogue data to write
        remaining = ZSTD_compressStream2((ZSTD_CCtx*)cstream->cstream, &output, &input, ZSTD_e_end);
        if (ZSTD_isError(remaining)) {
            rb_raise(rb_eRuntimeError, "Finish failed: %s", ZSTD_getErrorName(remaining));
        }

        if (output.pos > 0) {
            rb_str_set_len(outBuffer, output.pos);
            rb_funcall(cstream->io, rb_intern("write"), 1, outBuffer);
            // No need to resize - buffer capacity remains at outBufferSize
        }
    } while (remaining > 0);

    return self;
}

#flushObject



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'ext/vibe_zstd/streaming.c', line 136

static VALUE
vibe_zstd_writer_flush(VALUE self) {
    vibe_zstd_cstream* cstream;
    TypedData_Get_Struct(self, vibe_zstd_cstream, &vibe_zstd_cstream_type, cstream);

    size_t outBufferSize = ZSTD_CStreamOutSize();
    VALUE outBuffer = rb_str_buf_new(outBufferSize);

    ZSTD_inBuffer input = { NULL, 0, 0 };
    size_t remaining;

    // ZSTD_e_flush: flush internal buffers, making all data readable
    // Loop until remaining == 0 (flush complete)
    do {
        ZSTD_outBuffer output = {
            .dst = RSTRING_PTR(outBuffer),
            .size = outBufferSize,
            .pos = 0
        };

        // Return value > 0 means more flushing needed
        remaining = ZSTD_compressStream2((ZSTD_CCtx*)cstream->cstream, &output, &input, ZSTD_e_flush);
        if (ZSTD_isError(remaining)) {
            rb_raise(rb_eRuntimeError, "Flush failed: %s", ZSTD_getErrorName(remaining));
        }

        if (output.pos > 0) {
            rb_str_set_len(outBuffer, output.pos);
            rb_funcall(cstream->io, rb_intern("write"), 1, outBuffer);
            // No need to resize - buffer capacity remains at outBufferSize
        }
    } while (remaining > 0);

    return self;
}

#write(data) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'ext/vibe_zstd/streaming.c', line 93

static VALUE
vibe_zstd_writer_write(VALUE self, VALUE data) {
    Check_Type(data, T_STRING);

    vibe_zstd_cstream* cstream;
    TypedData_Get_Struct(self, vibe_zstd_cstream, &vibe_zstd_cstream_type, cstream);

    // Input buffer: pos advances as ZSTD consumes data
    ZSTD_inBuffer input = {
        .src = RSTRING_PTR(data),
        .size = RSTRING_LEN(data),
        .pos = 0
    };

    size_t outBufferSize = ZSTD_CStreamOutSize();
    VALUE outBuffer = rb_str_buf_new(outBufferSize);

    // Process all input data in chunks
    while (input.pos < input.size) {
        ZSTD_outBuffer output = {
            .dst = RSTRING_PTR(outBuffer),
            .size = outBufferSize,
            .pos = 0
        };

        // ZSTD_e_continue: continue compression without flushing
        // Return value is a hint for preferred input size (can be ignored)
        size_t result = ZSTD_compressStream2((ZSTD_CCtx*)cstream->cstream, &output, &input, ZSTD_e_continue);
        if (ZSTD_isError(result)) {
            rb_raise(rb_eRuntimeError, "Compression failed: %s", ZSTD_getErrorName(result));
        }

        // Write any compressed output that was produced
        if (output.pos > 0) {
            rb_str_set_len(outBuffer, output.pos);
            rb_funcall(cstream->io, rb_intern("write"), 1, outBuffer);
            // No need to resize - buffer capacity remains at outBufferSize
        }
    }

    return self;
}