Module: LogStash::Outputs::MicrosoftSentinelOutputInternal::CustomSizeBasedBuffer

Included in:
LogStashAutoResizeBuffer, LogStashCompressedStream
Defined in:
lib/logstash/sentinel/customSizeBasedBuffer.rb

Instance Method Summary collapse

Instance Method Details

#buffer_flush(options = {}) ⇒ Fixnum

Try to flush events.

Returns immediately if flushing is not necessary/possible at the moment:

  • :max_items or :flush_each have not been accumulated

  • :max_interval seconds have not elapased since the last flush

  • another flush is in progress

buffer_flush(:force => true) will cause a flush to occur even if :max_items or :flush_each or :max_interval have not been reached. A forced flush will still return immediately (without flushing) if another flush is currently in progress.

buffer_flush(:final => true) is identical to buffer_flush(:force => true), except that if another flush is already in progress, buffer_flush(:final => true) will block/wait for the other flush to finish before proceeding.

Parameters:

  • options (Hash) (defaults to: {})

    Optional. May be {:force => true} or {:final => true}.

Returns:

  • (Fixnum)

    The number of items successfully passed to flush.



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/logstash/sentinel/customSizeBasedBuffer.rb', line 189

def buffer_flush(options={})
  force = options[:force] || options[:final]
  final = options[:final]

  # final flush will wait for lock, so we are sure to flush out all buffered events
  if options[:final]
    @buffer_state[:flush_mutex].lock
  elsif ! @buffer_state[:flush_mutex].try_lock # failed to get lock, another flush already in progress
    return 0
  end

  items_flushed = 0

  begin
    return 0 if @buffer_state[:pending_count] == 0

    # compute time_since_last_flush only when some item is pending
    time_since_last_flush = get_time_since_last_flush

    return 0 if (!force) &&
       (@buffer_state[:pending_count] < @buffer_config[:max_items]) &&
       (@buffer_config[:flush_each] == 0 || @buffer_state[:pending_size] < @buffer_config[:flush_each]) &&
       (time_since_last_flush < @buffer_config[:max_interval])

    @buffer_state[:pending_mutex].synchronize do
      @buffer_state[:outgoing_items] = @buffer_state[:pending_items]
      @buffer_state[:outgoing_count] = @buffer_state[:pending_count]
      @buffer_state[:outgoing_size] = @buffer_state[:pending_size]
      buffer_clear_pending
    end
    @buffer_config[:logger].debug("Flushing output",
      :outgoing_count => @buffer_state[:outgoing_count],
      :time_since_last_flush => time_since_last_flush,
      :outgoing_events => @buffer_state[:outgoing_items],
      :batch_timeout => @buffer_config[:max_interval],
      :force => force,
      :final => final
    ) if @buffer_config[:logger]

    @buffer_state[:outgoing_items].each do |group, events|
      begin

        if group.nil?
          flush(events,final)
        else
          flush(events, group, final)
        end

        @buffer_state[:outgoing_items].delete(group)
        events_size = events.size
        @buffer_state[:outgoing_count] -= events_size
        if @buffer_config[:flush_each] != 0
          events_volume = 0
          events.each do |event|
            events_volume += var_size(event)
          end
          @buffer_state[:outgoing_size] -= events_volume
        end
        items_flushed += events_size

      rescue => e
        @buffer_config[:logger].warn("Failed to flush outgoing items",
          :outgoing_count => @buffer_state[:outgoing_count],
          :exception => e,
          :backtrace => e.backtrace
        ) if @buffer_config[:logger]

        if @buffer_config[:has_on_flush_error]
          on_flush_error e
        end

        sleep 1
        retry
      end
      @buffer_state[:last_flush] = Time.now.to_i
    end

  ensure
    @buffer_state[:flush_mutex].unlock
  end

  return items_flushed
end

#buffer_full?bool

Determine if :max_items or :flush_each has been reached.

buffer_receive calls will block while buffer_full? == true.

Returns:

  • (bool)

    Is the buffer full?



134
135
136
137
# File 'lib/logstash/sentinel/customSizeBasedBuffer.rb', line 134

def buffer_full?
  (@buffer_state[:pending_count] + @buffer_state[:outgoing_count] >= @buffer_config[:max_items]) || \
  (@buffer_config[:flush_each] != 0 && @buffer_state[:pending_size] + @buffer_state[:outgoing_size] >= @buffer_config[:flush_each])
end

#buffer_initialize(options = {}) ⇒ Object

Initialize the buffer.

Call directly from your constructor if you wish to set some non-default options. Otherwise buffer_initialize will be called automatically during the first buffer_receive call.

Options:

  • :max_items, Max number of items to buffer before flushing. Default 50.

  • :flush_each, Flush each bytes of buffer. Default 0 (no flushing fired by

    a buffer size).
    
  • :max_interval, Max number of seconds to wait between flushes. Default 5.

  • :logger, A logger to write log messages to. No default. Optional.

Parameters:

  • options (Hash) (defaults to: {})


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/logstash/sentinel/customSizeBasedBuffer.rb', line 85

def buffer_initialize(options={})
  if ! self.class.method_defined?(:flush)
    raise ArgumentError, "Any class including Stud::Buffer must define a flush() method."
  end

  @buffer_config = {
    :max_items => options[:max_items] || 50,
    :flush_each => options[:flush_each].to_i || 0,
    :max_interval => options[:max_interval] || 5,
    :logger => options[:logger] || nil,
    :has_on_flush_error => self.class.method_defined?(:on_flush_error),
    :has_on_full_buffer_receive => self.class.method_defined?(:on_full_buffer_receive)
  }
  @buffer_state = {
    # items accepted from including class
    :pending_items => {},
    :pending_count => 0,
    :pending_size => 0,

    # guard access to pending_items & pending_count & pending_size
    :pending_mutex => Mutex.new,

    # items which are currently being flushed
    :outgoing_items => {},
    :outgoing_count => 0,
    :outgoing_size => 0,

    # ensure only 1 flush is operating at once
    :flush_mutex => Mutex.new,

    # data for timed flushes
    :last_flush => Time.now.to_i,
    :timer => Thread.new do
      loop do
        sleep(@buffer_config[:max_interval])
        buffer_flush(:force => true)
      end
    end
  }

  # events we've accumulated
  buffer_clear_pending
end

#buffer_receive(event, group = nil) ⇒ Object

Save an event for later delivery

Events are grouped by the (optional) group parameter you provide. Groups of events, plus the group name, are later passed to flush.

This call will block if :max_items or :flush_each has been reached.

Parameters:

  • event

    An item to buffer for flushing later.

  • group (defaults to: nil)

    Optional grouping key. All events with the same key will be passed to flush together, along with the grouping key itself.

See Also:

  • The overview has more information on grouping and flushing.


151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/logstash/sentinel/customSizeBasedBuffer.rb', line 151

def buffer_receive(event, group=nil)
  buffer_initialize if ! @buffer_state

  # block if we've accumulated too many events
  while buffer_full? do
    on_full_buffer_receive(
      :pending => @buffer_state[:pending_count],
      :outgoing => @buffer_state[:outgoing_count]
    ) if @buffer_config[:has_on_full_buffer_receive]
    sleep 0.1
  end
  @buffer_state[:pending_mutex].synchronize do
    @buffer_state[:pending_items][group] << event
    @buffer_state[:pending_count] += 1
    @buffer_state[:pending_size] += var_size(event) if @buffer_config[:flush_each] != 0
  end

  buffer_flush
end