Class: Fluent::Rdkafka2Output::EnqueueRate

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_rdkafka2.rb

Defined Under Namespace

Classes: LimitExceeded

Instance Method Summary collapse

Constructor Details

#initialize(limit_bytes_per_second) ⇒ EnqueueRate

Returns a new instance of EnqueueRate.



148
149
150
151
152
153
154
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 148

def initialize(limit_bytes_per_second)
  @mutex = Mutex.new
  @start_clock = Fluent::Clock.now
  @bytes_per_second = 0
  @limit_bytes_per_second = limit_bytes_per_second
  @commits = {}
end

Instance Method Details

#raise_if_limit_exceeded(bytes_to_enqueue) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 156

def raise_if_limit_exceeded(bytes_to_enqueue)
  return if @limit_bytes_per_second.nil?

  @mutex.synchronize do
    @commits[Thread.current] = {
      clock: Fluent::Clock.now,
      bytesize: bytes_to_enqueue,
    }

    @bytes_per_second += @commits[Thread.current][:bytesize]
    duration = @commits[Thread.current][:clock] - @start_clock

    if duration < 1.0
      if @bytes_per_second > @limit_bytes_per_second
        raise LimitExceeded.new(@start_clock + 1.0)
      end
    else
      @start_clock = @commits[Thread.current][:clock]
      @bytes_per_second = @commits[Thread.current][:bytesize]
    end
  end
end

#revertObject



179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 179

def revert
  return if @limit_bytes_per_second.nil?

  @mutex.synchronize do
    return unless @commits[Thread.current]
    return unless @commits[Thread.current][:clock]
    if @commits[Thread.current][:clock] >= @start_clock
      @bytes_per_second -= @commits[Thread.current][:bytesize]
    end
    @commits[Thread.current] = nil
  end
end