1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
|
# File 'lib/fluent/plugin/output.rb', line 1510
def flush_thread_run(state)
flush_thread_interval = @buffer_config.flush_thread_interval
state.next_clock = Fluent::Clock.now + flush_thread_interval
while !self.after_started? && !self.stopped?
sleep 0.5
end
log.debug "flush_thread actually running"
state.mutex.lock
begin
while @output_flush_threads_running
current_clock = Fluent::Clock.now
next_retry_time = nil
@retry_mutex.synchronize do
next_retry_time = @retry ? @retry.next_time : nil
end
if state.next_clock > current_clock
interval = state.next_clock - current_clock
elsif next_retry_time && next_retry_time > Time.now
interval = next_retry_time.to_f - Time.now.to_f
else
state.mutex.unlock
begin
try_flush
interval = next_flush_time.to_f - Time.now.to_f
state.next_clock = Fluent::Clock.now + interval
ensure
state.mutex.lock
end
end
if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? }
unless @output_flush_interrupted
state.mutex.unlock
begin
try_rollback_write
ensure
state.mutex.lock
end
end
end
state.cond_var.wait(state.mutex, interval) if interval > 0
end
rescue => e
log.error "error on output thread", error: e
log.error_backtrace
raise
ensure
state.mutex.unlock
end
end
|