Class: Fluent::Rdkafka2Output::EnqueueRate
- Inherits:
-
Object
- Object
- Fluent::Rdkafka2Output::EnqueueRate
- Defined in:
- lib/fluent/plugin/out_rdkafka2.rb
Defined Under Namespace
Classes: LimitExceeded
Instance Method Summary collapse
-
#initialize(limit_bytes_per_second) ⇒ EnqueueRate
constructor
A new instance of EnqueueRate.
- #raise_if_limit_exceeded(bytes_to_enqueue) ⇒ Object
- #revert ⇒ Object
Constructor Details
#initialize(limit_bytes_per_second) ⇒ EnqueueRate
Returns a new instance of EnqueueRate.
148 149 150 151 152 153 154 |
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 148 def initialize(limit_bytes_per_second) @mutex = Mutex.new @start_clock = Fluent::Clock.now @bytes_per_second = 0 @limit_bytes_per_second = limit_bytes_per_second @commits = {} end |
Instance Method Details
#raise_if_limit_exceeded(bytes_to_enqueue) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 156 def raise_if_limit_exceeded(bytes_to_enqueue) return if @limit_bytes_per_second.nil? @mutex.synchronize do @commits[Thread.current] = { clock: Fluent::Clock.now, bytesize: bytes_to_enqueue, } @bytes_per_second += @commits[Thread.current][:bytesize] duration = @commits[Thread.current][:clock] - @start_clock if duration < 1.0 if @bytes_per_second > @limit_bytes_per_second raise LimitExceeded.new(@start_clock + 1.0) end else @start_clock = @commits[Thread.current][:clock] @bytes_per_second = @commits[Thread.current][:bytesize] end end end |
#revert ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 179 def revert return if @limit_bytes_per_second.nil? @mutex.synchronize do return unless @commits[Thread.current] return unless @commits[Thread.current][:clock] if @commits[Thread.current][:clock] >= @start_clock @bytes_per_second -= @commits[Thread.current][:bytesize] end @commits[Thread.current] = nil end end |