Class: Fluent::BasicBuffer

Inherits:
Buffer
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/fluent/buffer.rb

Direct Known Subclasses

FileBuffer, MemoryBuffer

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Method Summary collapse

Methods inherited from Buffer

#before_shutdown

Methods included from Configurable

#config, included, lookup_type, register_type

Constructor Details

#initializeBasicBuffer

Returns a new instance of BasicBuffer.



134
135
136
137
138
139
# File 'lib/fluent/buffer.rb', line 134

def initialize
  super
  @map = nil # chunks to store data
  @queue = nil # chunks to be flushed
  @parallel_pop = true
end

Instance Method Details

#clear!Object



357
358
359
360
361
362
# File 'lib/fluent/buffer.rb', line 357

def clear!
  @queue.delete_if {|chunk|
    chunk.purge
    true
  }
end

#configure(conf) ⇒ Object



160
161
162
163
164
165
166
# File 'lib/fluent/buffer.rb', line 160

def configure(conf)
  super

  if @buffer_queue_full_action == :block
    $log.warn "'block' action stops input process until the buffer full is resolved. Check your pipeline this action is fit or not"
  end
end

#emit(key, data, chain) ⇒ Object



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/fluent/buffer.rb', line 190

def emit(key, data, chain)
  key = key.to_s

  synchronize do
    begin
      # chunk unique id is generated in #new_chunk
      chunk = (@map[key] ||= new_chunk(key))

      if storable?(chunk, data)
        chain.next
        chunk << data
        return false

      elsif @queue.size >= @buffer_queue_limit
        raise BufferQueueLimitError, "queue size exceeds limit"
      end
    rescue BufferQueueLimitError => e
      case @buffer_queue_full_action
      when :exception
        raise e
      when :block
        # This is rough implementation. New Buffer API should improve this routine by using wait/signal
        $log.debug "buffer queue is full. Wait 1 second to re-emit events"
        sleep 1
        retry
      when :drop_oldest_chunk
        $log.debug "buffer queue is full. Dropping oldest chunk"
        pop(nil)
      end
    end

    if data.bytesize > @buffer_chunk_limit
      $log.warn "Size of the emitted data exceeds buffer_chunk_limit."
      $log.warn "This may occur problems in the output plugins ``at this server.``"
      $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit"
      $log.warn "in the forward output ``at the log forwarding server.``"
      ### TODO
      # raise BufferChunkLimitError, "received data too large"
    end

    # chunk unique id is generated in #new_chunk
    nc = new_chunk(key)
    ok = false

    begin
      nc << data
      chain.next

      flush_trigger = false
      @queue.synchronize {
        enqueue(chunk) # this is buffer enqueue *hook*
        flush_trigger = @queue.empty?
        @queue << chunk # actual enqueue
        @map[key] = nc
      }

      ok = true
      # false: queue have 1 or more chunks before this emit
      #        so this enqueue is not a trigger to flush
      # true: queue have no chunks before this emit
      #       so this enqueue is a trigger to flush this buffer ASAP
      return flush_trigger
    ensure
      nc.purge unless ok
    end

  end  # synchronize
end

#enable_parallel(b = true) ⇒ Object



141
142
143
# File 'lib/fluent/buffer.rb', line 141

def enable_parallel(b=true)
  @parallel_pop = b
end

#enqueue(chunk) ⇒ Object

enqueueing is done by #push this method is actually ‘enqueue_hook’

Raises:

  • (NotImplementedError)


292
293
294
# File 'lib/fluent/buffer.rb', line 292

def enqueue(chunk)
  raise NotImplementedError, "Implement this method in child class"
end

#keysObject



259
260
261
# File 'lib/fluent/buffer.rb', line 259

def keys
  @map.keys
end

#new_chunk(key) ⇒ Object

Raises:

  • (NotImplementedError)


282
283
284
# File 'lib/fluent/buffer.rb', line 282

def new_chunk(key)
  raise NotImplementedError, "Implement this method in child class"
end

#pop(out) ⇒ Object

shift a chunk from queue, write and purge it returns boolean to indicate whether this buffer have more chunk to be flushed or not



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/fluent/buffer.rb', line 316

def pop(out)
  chunk = nil
  @queue.synchronize do
    if @parallel_pop
      chunk = @queue.find {|c| c.try_mon_enter }
      return false unless chunk
    else
      chunk = @queue.first
      return false unless chunk
      return false unless chunk.try_mon_enter
    end
  end

  begin
    # #push(key) does not push empty chunks into queue.
    # so this check is nonsense...
    if !chunk.empty? && !out.nil?
      write_chunk(chunk, out)
    end

    queue_empty = false
    @queue.synchronize do
      @queue.delete_if {|c|
        c.object_id == chunk.object_id
      }
      queue_empty = @queue.empty?
    end

    chunk.purge

    # return to be flushed once more immediately, or not
    return !queue_empty
  ensure
    chunk.mon_exit
  end
end

#push(key) ⇒ Object

get the chunk specified by key, and push it into queue



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/fluent/buffer.rb', line 297

def push(key)
  synchronize do
    chunk = @map[key]
    if !chunk || chunk.empty?
      return false
    end

    @queue.synchronize do
      enqueue(chunk)
      @queue << chunk
      @map.delete(key)
    end

    return true
  end  # synchronize
end

#queue_sizeObject



263
264
265
# File 'lib/fluent/buffer.rb', line 263

def queue_size
  @queue.size
end

#resumeObject

Raises:

  • (NotImplementedError)


286
287
288
# File 'lib/fluent/buffer.rb', line 286

def resume
  raise NotImplementedError, "Implement this method in child class"
end

#shutdownObject



173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/fluent/buffer.rb', line 173

def shutdown
  synchronize do
    @queue.synchronize do
      until @queue.empty?
        @queue.shift.close
      end
    end
    @map.each_pair {|key,chunk|
      chunk.close
    }
  end
end

#startObject



168
169
170
171
# File 'lib/fluent/buffer.rb', line 168

def start
  @queue, @map = resume
  @queue.extend(MonitorMixin)
end

#storable?(chunk, data) ⇒ Boolean

Returns:

  • (Boolean)


186
187
188
# File 'lib/fluent/buffer.rb', line 186

def storable?(chunk, data)
  chunk.size + data.bytesize <= @buffer_chunk_limit
end

#total_queued_chunk_sizeObject



267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/fluent/buffer.rb', line 267

def total_queued_chunk_size
  total = 0
  synchronize {
    @map.each_value {|c|
      total += c.size
    }
    @queue.synchronize {
      @queue.each {|c|
        total += c.size
      }
    }
  }
  total
end

#write_chunk(chunk, out) ⇒ Object



353
354
355
# File 'lib/fluent/buffer.rb', line 353

def write_chunk(chunk, out)
  out.write(chunk)
end