Class: Fluent::BufferedOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/output_try_flush_interval_patch.rb

Instance Method Summary collapse

Instance Method Details

#try_flushObject

override with @try_flush_interval



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/output_try_flush_interval_patch.rb', line 9

def try_flush
  time = Engine.now

  empty = @buffer.queue_size == 0
  if empty && @next_flush_time < (now = Engine.now)
    @buffer.synchronize do
      if @next_flush_time < now
        enqueue_buffer
        @next_flush_time = now + @flush_interval
        empty = @buffer.queue_size == 0
      end
    end
  end
  if empty
    return time + @try_flush_interval
  end

  begin
    retrying = !@error_history.empty?

    if retrying
      @error_history.synchronize do
        if retrying = !@error_history.empty?  # re-check in synchronize
          if @next_retry_time >= time
            # allow retrying for only one thread
            return time + @try_flush_interval
          end
          # assume next retry failes and
          # clear them if when it succeeds
          @last_retry_time = time
          @error_history << time
          @next_retry_time += calc_retry_wait
        end
      end
    end

    if @secondary && @error_history.size > @retry_limit
      has_next = flush_secondary(@secondary)
    else
      has_next = @buffer.pop(self)
    end

    # success
    if retrying
      @error_history.clear
      # Note: don't notify to other threads to prevent
      #       burst to recovered server
      $log.warn "retry succeeded.", :instance=>object_id
    end

    if has_next
      return Engine.now + @queued_chunk_flush_interval
    else
      return time + @try_flush_interval
    end

  rescue => e
    if retrying
      error_count = @error_history.size
    else
      # first error
      error_count = 0
      @error_history.synchronize do
        if @error_history.empty?
          @last_retry_time = time
          @error_history << time
          @next_retry_time = time + calc_retry_wait
        end
      end
    end

    if error_count < @retry_limit
      $log.warn "temporarily failed to flush the buffer.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id
      $log.warn_backtrace e.backtrace

    elsif @secondary
      if error_count == @retry_limit
        $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id
        $log.warn "retry count exceededs limit. falling back to secondary output."
        $log.warn_backtrace e.backtrace
        retry  # retry immediately
      elsif error_count <= @retry_limit + @secondary_limit
        $log.warn "failed to flush the buffer, next retry will be with secondary output.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id
        $log.warn_backtrace e.backtrace
      else
        $log.warn "failed to flush the buffer.", :error_class=>e.class, :error=>e.to_s, :instance=>object_id
        $log.warn "secondary retry count exceededs limit."
        $log.warn_backtrace e.backtrace
        write_abort
        @error_history.clear
      end

    else
      $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id
      $log.warn "retry count exceededs limit."
      $log.warn_backtrace e.backtrace
      write_abort
      @error_history.clear
    end

    return @next_retry_time
  end
end