Class: Fluent::MessagePackEventStream
Instance Method Summary
collapse
Methods inherited from EventStream
#==, #to_compressed_msgpack_stream, #to_msgpack_stream_forced_integer
#compress, #decompress
#msgpack_factory, #msgpack_packer, #msgpack_unpacker
Constructor Details
#initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) ⇒ MessagePackEventStream
Keep cached_unpacker argument for existing plugins
203
204
205
206
207
208
|
# File 'lib/fluent/event.rb', line 203
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
@data = data
@size = size
@unpacked_times = unpacked_times
@unpacked_records = unpacked_records
end
|
Instance Method Details
#dup ⇒ Object
214
215
216
217
218
219
220
|
# File 'lib/fluent/event.rb', line 214
def dup
if @unpacked_times
self.class.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup))
else
self.class.new(@data.dup, nil, @size)
end
end
|
#each(&block) ⇒ Object
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
|
# File 'lib/fluent/event.rb', line 253
def each(&block)
if @unpacked_times
@unpacked_times.each_with_index do |time, i|
block.call(time, @unpacked_records[i])
end
else
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
block.call(time, record)
end
@size = @unpacked_times.size
end
nil
end
|
#empty? ⇒ Boolean
210
211
212
|
# File 'lib/fluent/event.rb', line 210
def empty?
@data.empty?
end
|
#ensure_unpacked! ⇒ Object
233
234
235
236
237
238
239
240
241
242
243
244
|
# File 'lib/fluent/event.rb', line 233
def ensure_unpacked!
return if @unpacked_times && @unpacked_records
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
end
@size = @unpacked_times.size
end
|
#repeatable? ⇒ Boolean
229
230
231
|
# File 'lib/fluent/event.rb', line 229
def repeatable?
true
end
|
#size ⇒ Object
222
223
224
225
226
227
|
# File 'lib/fluent/event.rb', line 222
def size
ensure_unpacked! if @size == 0
@size
end
|
#slice(index, num) ⇒ Object
This method returns MultiEventStream, because there are no reason to surve binary serialized by msgpack.
248
249
250
251
|
# File 'lib/fluent/event.rb', line 248
def slice(index, num)
ensure_unpacked!
MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
end
|
#to_msgpack_stream(time_int: false) ⇒ Object
271
272
273
274
|
# File 'lib/fluent/event.rb', line 271
def to_msgpack_stream(time_int: false)
@data
end
|