Class: Fluent::EventLimitedFileBuffer

Inherits:
FileBuffer
  • Object
show all
Defined in:
lib/fluent/plugin/buf_event_limited.rb

Instance Method Summary collapse

Instance Method Details

#new_chunk(key) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/fluent/plugin/buf_event_limited.rb', line 42

def new_chunk(key)
  encoded_key = encode_key(key)
  path, tsuffix = make_path(encoded_key, 'b')
  unique_id = tsuffix_to_unique_id(tsuffix)

  EventLimitedBufferChunk.new(key, path, unique_id, @buffer_chunk_message_separator, 'a+', @symlink_path)
end

#resumeObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/buf_event_limited.rb', line 52

def resume
  maps = []
  queues = []

  Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
    identifier_part = chunk_identifier_in_path(path)
    if m = PATH_MATCH.match(identifier_part)
      key = decode_key(m[1])
      bq = m[2]
      tsuffix = m[3]
      timestamp = m[3].to_i(16)
      unique_id = tsuffix_to_unique_id(tsuffix)

      if bq == 'b'
        chunk = EventLimitedBufferChunk.new(key, path, unique_id, @buffer_chunk_message_separator, "a+")
        maps << [timestamp, chunk]
      elsif bq == 'q'
        chunk = EventLimitedBufferChunk.new(key, path, unique_id, @buffer_chunk_message_separator, "r")
        queues << [timestamp, chunk]
      end
    end
  }

  map = {}
  maps
    .sort_by { |(timestamp, chunk)| timestamp }
    .each    { |(timestamp, chunk)| map[chunk.key] = chunk }

  queue = queues
    .sort_by { |(timestamp, _chunk)| timestamp }
    .map     { |(_timestamp, chunk)| chunk }

  return queue, map
end

#storable?(chunk, data) ⇒ Boolean

Returns:

  • (Boolean)


87
88
89
90
# File 'lib/fluent/plugin/buf_event_limited.rb', line 87

def storable?(chunk, data)
  chunk.record_counter < @buffer_chunk_records_limit &&
    (chunk.size + data.bytesize) <= @buffer_chunk_limit
end