Class: Fluent::MessagePackEventStream

Inherits:
EventStream show all
Defined in:
lib/fluent/event.rb

Direct Known Subclasses

CompressedMessagePackEventStream

Instance Method Summary collapse

Methods inherited from EventStream

#==, #to_compressed_msgpack_stream, #to_msgpack_stream_forced_integer

Methods included from Plugin::Compressable

#compress, #decompress

Methods included from Fluent::MessagePackFactory::Mixin

#msgpack_factory, #msgpack_packer, #msgpack_unpacker

Constructor Details

#initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) ⇒ MessagePackEventStream

Keep cached_unpacker argument for existing plugins



203
204
205
206
207
208
# File 'lib/fluent/event.rb', line 203

def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
  @data = data
  @size = size
  @unpacked_times = unpacked_times
  @unpacked_records = unpacked_records
end

Instance Method Details

#dupObject



214
215
216
217
218
219
220
# File 'lib/fluent/event.rb', line 214

def dup
  if @unpacked_times
    self.class.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup))
  else
    self.class.new(@data.dup, nil, @size)
  end
end

#each(&block) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/fluent/event.rb', line 253

def each(&block)
  if @unpacked_times
    @unpacked_times.each_with_index do |time, i|
      block.call(time, @unpacked_records[i])
    end
  else
    @unpacked_times = []
    @unpacked_records = []
    msgpack_unpacker.feed_each(@data) do |time, record|
      @unpacked_times << time
      @unpacked_records << record
      block.call(time, record)
    end
    @size = @unpacked_times.size
  end
  nil
end

#empty?Boolean

Returns:

  • (Boolean)


210
211
212
# File 'lib/fluent/event.rb', line 210

def empty?
  @data.empty?
end

#ensure_unpacked!Object



233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/fluent/event.rb', line 233

def ensure_unpacked!
  return if @unpacked_times && @unpacked_records
  @unpacked_times = []
  @unpacked_records = []
  msgpack_unpacker.feed_each(@data) do |time, record|
    @unpacked_times << time
    @unpacked_records << record
  end
  # @size should be updated always right after unpack.
  # The real size of unpacked objects are correct, rather than given size.
  @size = @unpacked_times.size
end

#repeatable?Boolean

Returns:

  • (Boolean)


229
230
231
# File 'lib/fluent/event.rb', line 229

def repeatable?
  true
end

#sizeObject



222
223
224
225
226
227
# File 'lib/fluent/event.rb', line 222

def size
  # @size is unbelievable always when @size == 0
  # If the number of events is really zero, unpacking events takes very short time.
  ensure_unpacked! if @size == 0
  @size
end

#slice(index, num) ⇒ Object

This method returns MultiEventStream, because there are no reason to surve binary serialized by msgpack.



248
249
250
251
# File 'lib/fluent/event.rb', line 248

def slice(index, num)
  ensure_unpacked!
  MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
end

#to_msgpack_stream(time_int: false) ⇒ Object



271
272
273
274
# File 'lib/fluent/event.rb', line 271

def to_msgpack_stream(time_int: false)
  # time_int is always ignored because @data is always packed binary in this class
  @data
end