Class: Fluent::BufferedOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::BufferedOutput
- Defined in:
- lib/fluent/plugin/output_try_flush_interval_patch.rb
Instance Method Summary collapse
-
#try_flush ⇒ Object
override with @try_flush_interval.
Instance Method Details
#try_flush ⇒ Object
override with @try_flush_interval
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/fluent/plugin/output_try_flush_interval_patch.rb', line 9 def try_flush time = Engine.now empty = @buffer.queue_size == 0 if empty && @next_flush_time < (now = Engine.now) @buffer.synchronize do if @next_flush_time < now enqueue_buffer @next_flush_time = now + @flush_interval empty = @buffer.queue_size == 0 end end end if empty return time + @try_flush_interval end begin = !@error_history.empty? if @error_history.synchronize do if = !@error_history.empty? # re-check in synchronize if @next_retry_time >= time # allow retrying for only one thread return time + @try_flush_interval end # assume next retry failes and # clear them if when it succeeds @last_retry_time = time @error_history << time @next_retry_time += calc_retry_wait end end end if @secondary && @error_history.size > @retry_limit has_next = flush_secondary(@secondary) else has_next = @buffer.pop(self) end # success if @error_history.clear # Note: don't notify to other threads to prevent # burst to recovered server $log.warn "retry succeeded.", :instance=>object_id end if has_next return Engine.now + @queued_chunk_flush_interval else return time + @try_flush_interval end rescue => e if error_count = @error_history.size else # first error error_count = 0 @error_history.synchronize do if @error_history.empty? @last_retry_time = time @error_history << time @next_retry_time = time + calc_retry_wait end end end if error_count < @retry_limit $log.warn "temporarily failed to flush the buffer.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id $log.warn_backtrace e.backtrace elsif @secondary if error_count == @retry_limit $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id $log.warn "retry count exceededs limit. falling back to secondary output." $log.warn_backtrace e.backtrace retry # retry immediately elsif error_count <= @retry_limit + @secondary_limit $log.warn "failed to flush the buffer, next retry will be with secondary output.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id $log.warn_backtrace e.backtrace else $log.warn "failed to flush the buffer.", :error_class=>e.class, :error=>e.to_s, :instance=>object_id $log.warn "secondary retry count exceededs limit." $log.warn_backtrace e.backtrace write_abort @error_history.clear end else $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :instance=>object_id $log.warn "retry count exceededs limit." $log.warn_backtrace e.backtrace write_abort @error_history.clear end return @next_retry_time end end |