Class: LogStash::Codecs::IdentityMapCodec

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/codecs/identity_map_codec.rb

Defined Under Namespace

Modules: EightyPercentWarning, UpperLimitReached Classes: CodecValue, IdentityMapUpperLimitException, NoopRunner, PeriodicRunner

Constant Summary collapse

MAX_IDENTITIES =

maximum size of the mapping hash

20_000
EVICT_TIMEOUT =

time after which a stream is considered stale each time a stream is accessed it is given a new timeout

60 * 60 * 1
CLEANER_INTERVAL =

time that the cleaner thread sleeps for before it tries to clean out stale mappings

60 * 5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(codec) ⇒ IdentityMapCodec

Returns a new instance of IdentityMapCodec.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/logstash/codecs/identity_map_codec.rb', line 108

def initialize(codec)
  @base_codec = codec
  @base_codecs = [codec]
  @identity_map = ThreadSafe::Hash.new &method(:codec_builder)
  @max_identities = MAX_IDENTITIES
  @evict_timeout = EVICT_TIMEOUT
  cleaner_interval(CLEANER_INTERVAL)
  if codec.respond_to?(:use_mapper_auto_flush) &&
      (@auto_flush_interval = codec.use_mapper_auto_flush)
    @auto_flusher = PeriodicRunner.new(self, 0.5, :auto_flush_mapped)
  else
    @auto_flusher = NoopRunner.new
  end

  @decode_block = lambda {|*| true }
  @eviction_block = nil
end

Instance Attribute Details

#auto_flusherObject

Returns the value of attribute auto_flusher.



106
107
108
# File 'lib/logstash/codecs/identity_map_codec.rb', line 106

def auto_flusher
  @auto_flusher
end

#base_codecObject

Returns the value of attribute base_codec.



106
107
108
# File 'lib/logstash/codecs/identity_map_codec.rb', line 106

def base_codec
  @base_codec
end

#cleanerObject

Returns the value of attribute cleaner.



106
107
108
# File 'lib/logstash/codecs/identity_map_codec.rb', line 106

def cleaner
  @cleaner
end

#identity_mapObject (readonly)

5 minutes



105
106
107
# File 'lib/logstash/codecs/identity_map_codec.rb', line 105

def identity_map
  @identity_map
end

Instance Method Details

#accept(listener) ⇒ Object



176
177
178
# File 'lib/logstash/codecs/identity_map_codec.rb', line 176

def accept(listener)
  stream_codec(listener.path).accept(listener)
end

#all_codecsObject



232
233
234
# File 'lib/logstash/codecs/identity_map_codec.rb', line 232

def all_codecs
  no_streams? ? @base_codecs : identity_map.values.map(&:codec)
end

#auto_flush_mappedObject

end Codec API



210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/logstash/codecs/identity_map_codec.rb', line 210

def auto_flush_mapped
  if !identity_count.zero?
    nowf = Time.now.to_f
    identity_map.each do |identity, compo|
      next if compo.auto_flush_timeout.zero?
      next unless nowf > compo.auto_flush_timeout
      compo.codec.auto_flush
      # at eof (tail and read) no more lines for a while or ever
      # so reset compo.auto_flush_timeout
      compo.auto_flush_timeout = 0
    end
  end
end

#cleaner_interval(interval) ⇒ Object

used to add a non-default cleaner interval



143
144
145
146
147
# File 'lib/logstash/codecs/identity_map_codec.rb', line 143

def cleaner_interval(interval)
  @cleaner.stop if @cleaner
  @cleaner = PeriodicRunner.new(self, interval.to_i, :map_cleanup)
  self
end

#closeObject



202
203
204
205
206
# File 'lib/logstash/codecs/identity_map_codec.rb', line 202

def close()
  cleaner.stop
  auto_flusher.stop
  all_codecs.each(&:close)
end

#codec_without_usage_update(identity) ⇒ Object



283
284
285
# File 'lib/logstash/codecs/identity_map_codec.rb', line 283

def codec_without_usage_update(identity)
  find_codec_value(identity).codec
end

#current_size_and_limitObject



274
275
276
# File 'lib/logstash/codecs/identity_map_codec.rb', line 274

def current_size_and_limit
  [identity_count, max_limit]
end

#decode(data, identity = nil, &block) ⇒ Object Also known as: <<

Codec API



171
172
173
174
# File 'lib/logstash/codecs/identity_map_codec.rb', line 171

def decode(data, identity = nil, &block)
  @decode_block = block if @decode_block != block
  stream_codec(identity).decode(data, &block)
end

#encode(event, identity = nil) ⇒ Object



182
183
184
# File 'lib/logstash/codecs/identity_map_codec.rb', line 182

def encode(event, identity = nil)
  stream_codec(identity).encode(event)
end

#evict(identity) ⇒ Object

IdentityMapCodec API



160
161
162
163
164
165
# File 'lib/logstash/codecs/identity_map_codec.rb', line 160

def evict(identity)
  # maybe called more than once
  if (compo = identity_map.delete(identity))
    compo.codec.auto_flush if compo.codec.respond_to?(:auto_flush)
  end
end

#evict_flush(codec) ⇒ Object



263
264
265
266
267
268
269
270
271
272
# File 'lib/logstash/codecs/identity_map_codec.rb', line 263

def evict_flush(codec)
  if codec.respond_to?(:auto_flush)
    codec.auto_flush
  else
    if (block = @eviction_block || @decode_block)
      codec.flush(&block)
    end
    # all else - can't do anything
  end
end

#evict_timeout(timeout) ⇒ Object

used to add a non-default evict timeout



137
138
139
140
# File 'lib/logstash/codecs/identity_map_codec.rb', line 137

def evict_timeout(timeout)
  @evict_timeout = timeout.to_i
  self
end

#eviction_block(block) ⇒ Object

used to add a non-default eviction block



150
151
152
153
# File 'lib/logstash/codecs/identity_map_codec.rb', line 150

def eviction_block(block)
  @eviction_block = block
  self
end

#eviction_timestamp_for(identity) ⇒ Object



287
288
289
# File 'lib/logstash/codecs/identity_map_codec.rb', line 287

def eviction_timestamp_for(identity)
  find_codec_value(identity).eviction_timeout
end

#flush(&block) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/logstash/codecs/identity_map_codec.rb', line 186

def flush(&block)
  all_codecs.each do |codec|
    #let ruby do its default args thing
    if block_given?
      codec.flush(&block)
    else
      if codec.respond_to?(:auto_flush)
        codec.auto_flush
      else
        #try this, no guarantees
        codec.flush
      end
    end
  end
end

#flush_mapped(listener) ⇒ Object



224
225
226
227
228
229
230
# File 'lib/logstash/codecs/identity_map_codec.rb', line 224

def flush_mapped(listener)
  listener_has_path = listener.respond_to?(:path)
  identity_map.each do |identity, compo|
    listener.path = identity if listener_has_path
    compo.codec.auto_flush(listener)
  end
end

#identity_countObject



240
241
242
# File 'lib/logstash/codecs/identity_map_codec.rb', line 240

def identity_count
  identity_map.size
end

#loggerObject



278
279
280
281
# File 'lib/logstash/codecs/identity_map_codec.rb', line 278

def logger
  # we 'borrow' the codec's logger as we don't have our own
  @base_codec.logger
end

#map_cleanupObject

support cleaning of stale stream/codecs a stream is considered stale if it has not been accessed in the last @evict_timeout period (default 1 hour)



248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/logstash/codecs/identity_map_codec.rb', line 248

def map_cleanup
  if !identity_count.zero?
    nowi = Time.now.to_i
    # delete_if is atomic
    # contents should not mutate during this call
    identity_map.delete_if do |identity, compo|
      if (flag = compo.eviction_timeout <= nowi)
        evict_flush(compo.codec)
      end
      flag
    end
  end
  current_size_and_limit
end

#max_identities(max) ⇒ Object

Constructional/builder methods chain this method off of new

used to add a non-default maximum identities



131
132
133
134
# File 'lib/logstash/codecs/identity_map_codec.rb', line 131

def max_identities(max)
  @max_identities = max.to_i
  self
end

#max_limitObject



236
237
238
# File 'lib/logstash/codecs/identity_map_codec.rb', line 236

def max_limit
  @max_identities
end