Class: Fluent::FileBuffer
- Inherits:
-
BasicBuffer
show all
- Defined in:
- lib/fluent/plugin/buf_file.rb,
lib/fluent/test/input_test.rb
Constant Summary
collapse
- PATH_MATCH =
Dots are separator for many cases:
we should have to escape dots in keys...
/^([-_.%0-9a-zA-Z]*)\.(b|q)([0-9a-fA-F]{1,32})$/
- @@buffer_paths =
{}
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from BasicBuffer
#clear!, #emit, #enable_parallel, #keys, #pop, #push, #queue_size, #shutdown, #storable?, #total_queued_chunk_size, #write_chunk
Methods inherited from Buffer
#clear!, #emit, #keys, #pop, #push, #shutdown
#config, included, lookup_type, register_type
Constructor Details
Returns a new instance of FileBuffer.
85
86
87
88
89
90
|
# File 'lib/fluent/plugin/buf_file.rb', line 85
def initialize
require 'uri'
super
@uri_parser = URI::Parser.new
end
|
Instance Attribute Details
#symlink_path ⇒ Object
100
101
102
|
# File 'lib/fluent/plugin/buf_file.rb', line 100
def symlink_path
@symlink_path
end
|
Class Method Details
.clear_buffer_paths ⇒ Object
25
26
27
|
# File 'lib/fluent/test/input_test.rb', line 25
def self.clear_buffer_paths
@@buffer_paths = {}
end
|
Instance Method Details
#before_shutdown(out) ⇒ Object
195
196
197
198
199
200
201
202
203
204
205
|
# File 'lib/fluent/plugin/buf_file.rb', line 195
def before_shutdown(out)
if @flush_at_shutdown
synchronize do
@map.each_key {|key|
push(key)
}
while pop(out)
end
end
end
end
|
#chunk_identifier_in_path(path) ⇒ Object
176
177
178
179
180
181
|
# File 'lib/fluent/plugin/buf_file.rb', line 176
def chunk_identifier_in_path(path)
pos_after_prefix = @buffer_path_prefix.length
pos_before_suffix = @buffer_path_suffix.length + 1
path.slice(pos_after_prefix..-pos_before_suffix)
end
|
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/fluent/plugin/buf_file.rb', line 102
def configure(conf)
super
if @@buffer_paths.has_key?(@buffer_path)
raise ConfigError, "Other '#{@@buffer_paths[@buffer_path]}' plugin already use same buffer_path: type = #{conf['@type'] || conf['type']}, buffer_path = #{@buffer_path}"
else
@@buffer_paths[@buffer_path] = conf['@type'] || conf['type']
end
if pos = @buffer_path.index('*')
@buffer_path_prefix = @buffer_path[0, pos]
@buffer_path_suffix = @buffer_path[(pos + 1)..-1]
else
@buffer_path_prefix = @buffer_path + "."
@buffer_path_suffix = ".log"
end
end
|
#enqueue(chunk) ⇒ Object
183
184
185
186
187
188
189
190
191
192
193
|
# File 'lib/fluent/plugin/buf_file.rb', line 183
def enqueue(chunk)
path = chunk.path
identifier_part = chunk_identifier_in_path(path)
m = PATH_MATCH.match(identifier_part)
encoded_key = m ? m[1] : ""
tsuffix = m[3]
npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}"
chunk.mv(npath)
end
|
#new_chunk(key) ⇒ Object
130
131
132
133
134
135
|
# File 'lib/fluent/plugin/buf_file.rb', line 130
def new_chunk(key)
encoded_key = encode_key(key)
path, tsuffix = make_path(encoded_key, "b")
unique_id = tsuffix_to_unique_id(tsuffix)
FileBufferChunk.new(key, path, unique_id, "a+", @symlink_path)
end
|
#resume ⇒ Object
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
# File 'lib/fluent/plugin/buf_file.rb', line 137
def resume
maps = []
queues = []
Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
identifier_part = chunk_identifier_in_path(path)
if m = PATH_MATCH.match(identifier_part)
key = decode_key(m[1])
bq = m[2]
tsuffix = m[3]
timestamp = m[3].to_i(16)
unique_id = tsuffix_to_unique_id(tsuffix)
if bq == 'b'
chunk = FileBufferChunk.new(key, path, unique_id, "a+")
maps << [timestamp, chunk]
elsif bq == 'q'
chunk = FileBufferChunk.new(key, path, unique_id, "r")
queues << [timestamp, chunk]
end
end
}
map = {}
maps.sort_by {|(timestamp,chunk)|
timestamp
}.each {|(timestamp,chunk)|
map[chunk.key] = chunk
}
queue = queues.sort_by {|(timestamp,chunk)|
timestamp
}.map {|(timestamp,chunk)|
chunk
}
return queue, map
end
|
#start ⇒ Object
121
122
123
124
|
# File 'lib/fluent/plugin/buf_file.rb', line 121
def start
FileUtils.mkdir_p File.dirname(@buffer_path_prefix + "path"), mode: DEFAULT_DIR_PERMISSION
super
end
|