Method: Fluent::Plugin::Output#flush_thread_run

Defined in:
lib/fluent/plugin/output.rb

#flush_thread_run(state) ⇒ Object



1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
# File 'lib/fluent/plugin/output.rb', line 1510

def flush_thread_run(state)
  flush_thread_interval = @buffer_config.flush_thread_interval

  state.next_clock = Fluent::Clock.now + flush_thread_interval

  while !self.after_started? && !self.stopped?
    sleep 0.5
  end
  log.debug "flush_thread actually running"

  state.mutex.lock
  begin
    # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
    while @output_flush_threads_running
      current_clock = Fluent::Clock.now
      next_retry_time = nil

      @retry_mutex.synchronize do
        next_retry_time = @retry ? @retry.next_time : nil
      end

      if state.next_clock > current_clock
        interval = state.next_clock - current_clock
      elsif next_retry_time && next_retry_time > Time.now
        interval = next_retry_time.to_f - Time.now.to_f
      else
        state.mutex.unlock
        begin
          try_flush
          # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
          interval = next_flush_time.to_f - Time.now.to_f
          # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected
          #       because @retry still exists (#commit_write is not called yet in #try_flush)
          #       @retry should be cleared if delayed commit is enabled? Or any other solution?
          state.next_clock = Fluent::Clock.now + interval
        ensure
          state.mutex.lock
        end
      end

      if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? }
        unless @output_flush_interrupted
          state.mutex.unlock
          begin
            try_rollback_write
          ensure
            state.mutex.lock
          end
        end
      end

      state.cond_var.wait(state.mutex, interval) if interval > 0
    end
  rescue => e
    # normal errors are rescued by output plugins in #try_flush
    # so this rescue section is for critical & unrecoverable errors
    log.error "error on output thread", error: e
    log.error_backtrace
    raise
  ensure
    state.mutex.unlock
  end
end