Class: LogStash::Codecs::IdentityMapCodec

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/codecs/identity_map_codec.rb

Defined Under Namespace

Modules: EightyPercentWarning, UpperLimitReached Classes: CodecValue, IdentityMapUpperLimitException, NoopRunner, PeriodicRunner

Constant Summary collapse

MAX_IDENTITIES =

maximum size of the mapping hash

20_000
EVICT_TIMEOUT =

time after which a stream is considered stale each time a stream is accessed it is given a new timeout

60 * 60 * 1
CLEANER_INTERVAL =

time that the cleaner thread sleeps for before it tries to clean out stale mappings

60 * 5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(codec) ⇒ IdentityMapCodec

Returns a new instance of IdentityMapCodec.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/logstash/codecs/identity_map_codec.rb', line 124

def initialize(codec)
  @base_codec = codec
  @base_codecs = [codec]
  @identity_map = Concurrent::Hash.new &method(:codec_builder)
  @max_identities = MAX_IDENTITIES
  @evict_timeout = EVICT_TIMEOUT
  cleaner_interval(CLEANER_INTERVAL)
  if codec.respond_to?(:use_mapper_auto_flush) &&
      (@auto_flush_interval = codec.use_mapper_auto_flush)
    @auto_flusher = PeriodicRunner.new(self, 0.5, :auto_flush_mapped)
  else
    @auto_flusher = NoopRunner.new
  end

  @decode_block = lambda {|*| true }
  @eviction_block = nil
end

Instance Attribute Details

#auto_flusherObject

Returns the value of attribute auto_flusher.



122
123
124
# File 'lib/logstash/codecs/identity_map_codec.rb', line 122

def auto_flusher
  @auto_flusher
end

#base_codecObject

Returns the value of attribute base_codec.



122
123
124
# File 'lib/logstash/codecs/identity_map_codec.rb', line 122

def base_codec
  @base_codec
end

#cleanerObject

Returns the value of attribute cleaner.



122
123
124
# File 'lib/logstash/codecs/identity_map_codec.rb', line 122

def cleaner
  @cleaner
end

#identity_mapObject (readonly)

5 minutes



121
122
123
# File 'lib/logstash/codecs/identity_map_codec.rb', line 121

def identity_map
  @identity_map
end

Instance Method Details

#accept(listener) ⇒ Object



192
193
194
# File 'lib/logstash/codecs/identity_map_codec.rb', line 192

def accept(listener)
  stream_codec(listener.path).accept(listener)
end

#all_codecsObject



248
249
250
# File 'lib/logstash/codecs/identity_map_codec.rb', line 248

def all_codecs
  no_streams? ? @base_codecs : identity_map.values.map(&:codec)
end

#auto_flush_mappedObject

end Codec API



226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/logstash/codecs/identity_map_codec.rb', line 226

def auto_flush_mapped
  if !identity_count.zero?
    nowf = Time.now.to_f
    identity_map.each do |identity, compo|
      next if compo.auto_flush_timeout.zero?
      next unless nowf > compo.auto_flush_timeout
      compo.codec.auto_flush
      # at eof (tail and read) no more lines for a while or ever
      # so reset compo.auto_flush_timeout
      compo.auto_flush_timeout = 0
    end
  end
end

#cleaner_interval(interval) ⇒ Object

used to add a non-default cleaner interval



159
160
161
162
163
# File 'lib/logstash/codecs/identity_map_codec.rb', line 159

def cleaner_interval(interval)
  @cleaner.stop if @cleaner
  @cleaner = PeriodicRunner.new(self, interval.to_i, :map_cleanup)
  self
end

#closeObject



218
219
220
221
222
# File 'lib/logstash/codecs/identity_map_codec.rb', line 218

def close()
  cleaner.stop
  auto_flusher.stop
  all_codecs.each(&:close)
end

#codec_without_usage_update(identity) ⇒ Object



299
300
301
302
# File 'lib/logstash/codecs/identity_map_codec.rb', line 299

def codec_without_usage_update(identity)
  return base_codec if identity.nil? # mirror optimization in `stream_codec`
  find_codec_value(identity).codec
end

#current_size_and_limitObject



290
291
292
# File 'lib/logstash/codecs/identity_map_codec.rb', line 290

def current_size_and_limit
  [identity_count, max_limit]
end

#decode(data, identity = nil, &block) ⇒ Object Also known as: <<

Codec API



187
188
189
190
# File 'lib/logstash/codecs/identity_map_codec.rb', line 187

def decode(data, identity = nil, &block)
  @decode_block = block if @decode_block != block
  stream_codec(identity).decode(data, &block)
end

#encode(event, identity = nil) ⇒ Object



198
199
200
# File 'lib/logstash/codecs/identity_map_codec.rb', line 198

def encode(event, identity = nil)
  stream_codec(identity).encode(event)
end

#evict(identity) ⇒ Object

IdentityMapCodec API



176
177
178
179
180
181
# File 'lib/logstash/codecs/identity_map_codec.rb', line 176

def evict(identity)
  # maybe called more than once
  if (compo = identity_map.delete(identity))
    compo.codec.auto_flush if compo.codec.respond_to?(:auto_flush)
  end
end

#evict_flush(codec) ⇒ Object



279
280
281
282
283
284
285
286
287
288
# File 'lib/logstash/codecs/identity_map_codec.rb', line 279

def evict_flush(codec)
  if codec.respond_to?(:auto_flush)
    codec.auto_flush
  else
    if (block = @eviction_block || @decode_block)
      codec.flush(&block)
    end
    # all else - can't do anything
  end
end

#evict_timeout(timeout) ⇒ Object

used to add a non-default evict timeout



153
154
155
156
# File 'lib/logstash/codecs/identity_map_codec.rb', line 153

def evict_timeout(timeout)
  @evict_timeout = timeout.to_i
  self
end

#eviction_block(block) ⇒ Object

used to add a non-default eviction block



166
167
168
169
# File 'lib/logstash/codecs/identity_map_codec.rb', line 166

def eviction_block(block)
  @eviction_block = block
  self
end

#eviction_timestamp_for(identity) ⇒ Object



304
305
306
# File 'lib/logstash/codecs/identity_map_codec.rb', line 304

def eviction_timestamp_for(identity)
  find_codec_value(identity).eviction_timeout
end

#flush(&block) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/logstash/codecs/identity_map_codec.rb', line 202

def flush(&block)
  all_codecs.each do |codec|
    #let ruby do its default args thing
    if block_given?
      codec.flush(&block)
    else
      if codec.respond_to?(:auto_flush)
        codec.auto_flush
      else
        #try this, no guarantees
        codec.flush
      end
    end
  end
end

#flush_mapped(listener) ⇒ Object



240
241
242
243
244
245
246
# File 'lib/logstash/codecs/identity_map_codec.rb', line 240

def flush_mapped(listener)
  listener_has_path = listener.respond_to?(:path)
  identity_map.each do |identity, compo|
    listener.path = identity if listener_has_path
    compo.codec.auto_flush(listener)
  end
end

#identity_countObject



256
257
258
# File 'lib/logstash/codecs/identity_map_codec.rb', line 256

def identity_count
  identity_map.size
end

#loggerObject



294
295
296
297
# File 'lib/logstash/codecs/identity_map_codec.rb', line 294

def logger
  # we 'borrow' the codec's logger as we don't have our own
  @base_codec.logger
end

#map_cleanupObject

support cleaning of stale stream/codecs a stream is considered stale if it has not been accessed in the last @evict_timeout period (default 1 hour)



264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/logstash/codecs/identity_map_codec.rb', line 264

def map_cleanup
  if !identity_count.zero?
    nowi = Time.now.to_i
    # delete_if is atomic
    # contents should not mutate during this call
    identity_map.delete_if do |identity, compo|
      if (flag = compo.eviction_timeout <= nowi)
        evict_flush(compo.codec)
      end
      flag
    end
  end
  current_size_and_limit
end

#max_identities(max) ⇒ Object

Constructional/builder methods chain this method off of new

used to add a non-default maximum identities



147
148
149
150
# File 'lib/logstash/codecs/identity_map_codec.rb', line 147

def max_identities(max)
  @max_identities = max.to_i
  self
end

#max_limitObject



252
253
254
# File 'lib/logstash/codecs/identity_map_codec.rb', line 252

def max_limit
  @max_identities
end