52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/fluent/plugin/buf_event_limited.rb', line 52
def resume
maps = []
queues = []
Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
identifier_part = chunk_identifier_in_path(path)
if m = PATH_MATCH.match(identifier_part)
key = decode_key(m[1])
bq = m[2]
tsuffix = m[3]
timestamp = m[3].to_i(16)
unique_id = tsuffix_to_unique_id(tsuffix)
if bq == 'b'
chunk = EventLimitedBufferChunk.new(key, path, unique_id, @buffer_chunk_message_separator, "a+")
maps << [timestamp, chunk]
elsif bq == 'q'
chunk = EventLimitedBufferChunk.new(key, path, unique_id, @buffer_chunk_message_separator, "r")
queues << [timestamp, chunk]
end
end
}
map = {}
maps
.sort_by { |(timestamp, chunk)| timestamp }
.each { |(timestamp, chunk)| map[chunk.key] = chunk }
queue = queues
.sort_by { |(timestamp, _chunk)| timestamp }
.map { |(_timestamp, chunk)| chunk }
return queue, map
end
|