Class: Fluent::Rdkafka2Output::EnqueueRate

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_rdkafka2.rb

Defined Under Namespace

Classes: LimitExceeded

Instance Method Summary collapse

Constructor Details

#initialize(limit_bytes_per_second) ⇒ EnqueueRate

Returns a new instance of EnqueueRate.



119
120
121
122
123
124
125
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 119

def initialize(limit_bytes_per_second)
  @mutex = Mutex.new
  @start_clock = Fluent::Clock.now
  @bytes_per_second = 0
  @limit_bytes_per_second = limit_bytes_per_second
  @commits = {}
end

Instance Method Details

#raise_if_limit_exceeded(bytes_to_enqueue) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 127

def raise_if_limit_exceeded(bytes_to_enqueue)
  return if @limit_bytes_per_second.nil?

  @mutex.synchronize do
    @commits[Thread.current] = {
      clock: Fluent::Clock.now,
      bytesize: bytes_to_enqueue,
    }

    @bytes_per_second += @commits[Thread.current][:bytesize]
    duration = @commits[Thread.current][:clock] - @start_clock

    if duration < 1.0
      if @bytes_per_second > @limit_bytes_per_second
        raise LimitExceeded.new(@start_clock + 1.0)
      end
    else
      @start_clock = @commits[Thread.current][:clock]
      @bytes_per_second = @commits[Thread.current][:bytesize]
    end
  end
end

#revertObject



150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/fluent/plugin/out_rdkafka2.rb', line 150

def revert
  return if @limit_bytes_per_second.nil?

  @mutex.synchronize do
    return unless @commits[Thread.current]
    return unless @commits[Thread.current][:clock]
    if @commits[Thread.current][:clock] >= @start_clock
      @bytes_per_second -= @commits[Thread.current][:bytesize]
    end
    @commits[Thread.current] = nil
  end
end