Class: Fluent::BasicBuffer
- Inherits:
-
Buffer
- Object
- Buffer
- Fluent::BasicBuffer
show all
- Includes:
- MonitorMixin
- Defined in:
- lib/fluent/buffer.rb
Constant Summary
Configurable::CONFIG_TYPE_REGISTRY
Instance Method Summary
collapse
Methods inherited from Buffer
#before_shutdown
#config, included, lookup_type, register_type
Constructor Details
Returns a new instance of BasicBuffer.
134
135
136
137
138
139
|
# File 'lib/fluent/buffer.rb', line 134
def initialize
super
@map = nil
@queue = nil
@parallel_pop = true
end
|
Instance Method Details
#clear! ⇒ Object
357
358
359
360
361
362
|
# File 'lib/fluent/buffer.rb', line 357
def clear!
@queue.delete_if {|chunk|
chunk.purge
true
}
end
|
160
161
162
163
164
165
166
|
# File 'lib/fluent/buffer.rb', line 160
def configure(conf)
super
if @buffer_queue_full_action == :block
$log.warn "'block' action stops input process until the buffer full is resolved. Check your pipeline this action is fit or not"
end
end
|
#emit(key, data, chain) ⇒ Object
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
|
# File 'lib/fluent/buffer.rb', line 190
def emit(key, data, chain)
key = key.to_s
synchronize do
begin
chunk = (@map[key] ||= new_chunk(key))
if storable?(chunk, data)
chain.next
chunk << data
return false
elsif @queue.size >= @buffer_queue_limit
raise BufferQueueLimitError, "queue size exceeds limit"
end
rescue BufferQueueLimitError => e
case @buffer_queue_full_action
when :exception
raise e
when :block
$log.debug "buffer queue is full. Wait 1 second to re-emit events"
sleep 1
retry
when :drop_oldest_chunk
$log.debug "buffer queue is full. Dropping oldest chunk"
pop(nil)
end
end
if data.bytesize > @buffer_chunk_limit
$log.warn "Size of the emitted data exceeds buffer_chunk_limit."
$log.warn "This may occur problems in the output plugins ``at this server.``"
$log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit"
$log.warn "in the forward output ``at the log forwarding server.``"
end
nc = new_chunk(key)
ok = false
begin
nc << data
chain.next
flush_trigger = false
@queue.synchronize {
enqueue(chunk)
flush_trigger = @queue.empty?
@queue << chunk
@map[key] = nc
}
ok = true
return flush_trigger
ensure
nc.purge unless ok
end
end
end
|
#enable_parallel(b = true) ⇒ Object
141
142
143
|
# File 'lib/fluent/buffer.rb', line 141
def enable_parallel(b=true)
@parallel_pop = b
end
|
#enqueue(chunk) ⇒ Object
enqueueing is done by #push this method is actually ‘enqueue_hook’
292
293
294
|
# File 'lib/fluent/buffer.rb', line 292
def enqueue(chunk)
raise NotImplementedError, "Implement this method in child class"
end
|
#keys ⇒ Object
259
260
261
|
# File 'lib/fluent/buffer.rb', line 259
def keys
@map.keys
end
|
#new_chunk(key) ⇒ Object
282
283
284
|
# File 'lib/fluent/buffer.rb', line 282
def new_chunk(key)
raise NotImplementedError, "Implement this method in child class"
end
|
#pop(out) ⇒ Object
shift a chunk from queue, write and purge it returns boolean to indicate whether this buffer have more chunk to be flushed or not
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
|
# File 'lib/fluent/buffer.rb', line 316
def pop(out)
chunk = nil
@queue.synchronize do
if @parallel_pop
chunk = @queue.find {|c| c.try_mon_enter }
return false unless chunk
else
chunk = @queue.first
return false unless chunk
return false unless chunk.try_mon_enter
end
end
begin
if !chunk.empty? && !out.nil?
write_chunk(chunk, out)
end
queue_empty = false
@queue.synchronize do
@queue.delete_if {|c|
c.object_id == chunk.object_id
}
queue_empty = @queue.empty?
end
chunk.purge
return !queue_empty
ensure
chunk.mon_exit
end
end
|
#push(key) ⇒ Object
get the chunk specified by key, and push it into queue
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
|
# File 'lib/fluent/buffer.rb', line 297
def push(key)
synchronize do
chunk = @map[key]
if !chunk || chunk.empty?
return false
end
@queue.synchronize do
enqueue(chunk)
@queue << chunk
@map.delete(key)
end
return true
end
end
|
#queue_size ⇒ Object
263
264
265
|
# File 'lib/fluent/buffer.rb', line 263
def queue_size
@queue.size
end
|
#resume ⇒ Object
286
287
288
|
# File 'lib/fluent/buffer.rb', line 286
def resume
raise NotImplementedError, "Implement this method in child class"
end
|
#shutdown ⇒ Object
173
174
175
176
177
178
179
180
181
182
183
184
|
# File 'lib/fluent/buffer.rb', line 173
def shutdown
synchronize do
@queue.synchronize do
until @queue.empty?
@queue.shift.close
end
end
@map.each_pair {|key,chunk|
chunk.close
}
end
end
|
#start ⇒ Object
168
169
170
171
|
# File 'lib/fluent/buffer.rb', line 168
def start
@queue, @map = resume
@queue.extend(MonitorMixin)
end
|
#storable?(chunk, data) ⇒ Boolean
186
187
188
|
# File 'lib/fluent/buffer.rb', line 186
def storable?(chunk, data)
chunk.size + data.bytesize <= @buffer_chunk_limit
end
|
#total_queued_chunk_size ⇒ Object
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
# File 'lib/fluent/buffer.rb', line 267
def total_queued_chunk_size
total = 0
synchronize {
@map.each_value {|c|
total += c.size
}
@queue.synchronize {
@queue.each {|c|
total += c.size
}
}
}
total
end
|
#write_chunk(chunk, out) ⇒ Object
353
354
355
|
# File 'lib/fluent/buffer.rb', line 353
def write_chunk(chunk, out)
out.write(chunk)
end
|