Class: LogStash::Codecs::IdentityMapCodec
- Inherits:
-
Object
- Object
- LogStash::Codecs::IdentityMapCodec
- Defined in:
- lib/logstash/codecs/identity_map_codec.rb
Defined Under Namespace
Modules: EightyPercentWarning, UpperLimitReached Classes: CodecValue, IdentityMapUpperLimitException, NoopRunner, PeriodicRunner
Constant Summary collapse
- MAX_IDENTITIES =
maximum size of the mapping hash
20_000- EVICT_TIMEOUT =
time after which a stream is considered stale each time a stream is accessed it is given a new timeout
60 * 60 * 1
- CLEANER_INTERVAL =
time that the cleaner thread sleeps for before it tries to clean out stale mappings
60 * 5
Instance Attribute Summary collapse
-
#auto_flusher ⇒ Object
Returns the value of attribute auto_flusher.
-
#base_codec ⇒ Object
Returns the value of attribute base_codec.
-
#cleaner ⇒ Object
Returns the value of attribute cleaner.
-
#identity_map ⇒ Object
readonly
5 minutes.
Instance Method Summary collapse
- #accept(listener) ⇒ Object
- #all_codecs ⇒ Object
-
#auto_flush_mapped ⇒ Object
end Codec API ==============================================.
-
#cleaner_interval(interval) ⇒ Object
used to add a non-default cleaner interval.
- #close ⇒ Object
- #codec_without_usage_update(identity) ⇒ Object
- #current_size_and_limit ⇒ Object
-
#decode(data, identity = nil, &block) ⇒ Object
(also: #<<)
Codec API.
- #encode(event, identity = nil) ⇒ Object
-
#evict(identity) ⇒ Object
IdentityMapCodec API.
- #evict_flush(codec) ⇒ Object
-
#evict_timeout(timeout) ⇒ Object
used to add a non-default evict timeout.
-
#eviction_block(block) ⇒ Object
used to add a non-default eviction block.
- #eviction_timestamp_for(identity) ⇒ Object
- #flush(&block) ⇒ Object
- #flush_mapped(listener) ⇒ Object
- #identity_count ⇒ Object
-
#initialize(codec) ⇒ IdentityMapCodec
constructor
A new instance of IdentityMapCodec.
- #logger ⇒ Object
-
#map_cleanup ⇒ Object
support cleaning of stale stream/codecs a stream is considered stale if it has not been accessed in the last @evict_timeout period (default 1 hour).
-
#max_identities(max) ⇒ Object
Constructional/builder methods chain this method off of new.
- #max_limit ⇒ Object
Constructor Details
#initialize(codec) ⇒ IdentityMapCodec
Returns a new instance of IdentityMapCodec.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 124 def initialize(codec) @base_codec = codec @base_codecs = [codec] @identity_map = Concurrent::Hash.new &method(:codec_builder) @max_identities = MAX_IDENTITIES @evict_timeout = EVICT_TIMEOUT cleaner_interval(CLEANER_INTERVAL) if codec.respond_to?(:use_mapper_auto_flush) && (@auto_flush_interval = codec.use_mapper_auto_flush) @auto_flusher = PeriodicRunner.new(self, 0.5, :auto_flush_mapped) else @auto_flusher = NoopRunner.new end @decode_block = lambda {|*| true } @eviction_block = nil end |
Instance Attribute Details
#auto_flusher ⇒ Object
Returns the value of attribute auto_flusher.
122 123 124 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 122 def auto_flusher @auto_flusher end |
#base_codec ⇒ Object
Returns the value of attribute base_codec.
122 123 124 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 122 def base_codec @base_codec end |
#cleaner ⇒ Object
Returns the value of attribute cleaner.
122 123 124 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 122 def cleaner @cleaner end |
#identity_map ⇒ Object (readonly)
5 minutes
121 122 123 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 121 def identity_map @identity_map end |
Instance Method Details
#accept(listener) ⇒ Object
192 193 194 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 192 def accept(listener) stream_codec(listener.path).accept(listener) end |
#all_codecs ⇒ Object
248 249 250 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 248 def all_codecs no_streams? ? @base_codecs : identity_map.values.map(&:codec) end |
#auto_flush_mapped ⇒ Object
end Codec API
226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 226 def auto_flush_mapped if !identity_count.zero? nowf = Time.now.to_f identity_map.each do |identity, compo| next if compo.auto_flush_timeout.zero? next unless nowf > compo.auto_flush_timeout compo.codec.auto_flush # at eof (tail and read) no more lines for a while or ever # so reset compo.auto_flush_timeout compo.auto_flush_timeout = 0 end end end |
#cleaner_interval(interval) ⇒ Object
used to add a non-default cleaner interval
159 160 161 162 163 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 159 def cleaner_interval(interval) @cleaner.stop if @cleaner @cleaner = PeriodicRunner.new(self, interval.to_i, :map_cleanup) self end |
#close ⇒ Object
218 219 220 221 222 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 218 def close() cleaner.stop auto_flusher.stop all_codecs.each(&:close) end |
#codec_without_usage_update(identity) ⇒ Object
299 300 301 302 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 299 def codec_without_usage_update(identity) return base_codec if identity.nil? # mirror optimization in `stream_codec` find_codec_value(identity).codec end |
#current_size_and_limit ⇒ Object
290 291 292 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 290 def current_size_and_limit [identity_count, max_limit] end |
#decode(data, identity = nil, &block) ⇒ Object Also known as: <<
Codec API
187 188 189 190 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 187 def decode(data, identity = nil, &block) @decode_block = block if @decode_block != block stream_codec(identity).decode(data, &block) end |
#encode(event, identity = nil) ⇒ Object
198 199 200 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 198 def encode(event, identity = nil) stream_codec(identity).encode(event) end |
#evict(identity) ⇒ Object
IdentityMapCodec API
176 177 178 179 180 181 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 176 def evict(identity) # maybe called more than once if (compo = identity_map.delete(identity)) compo.codec.auto_flush if compo.codec.respond_to?(:auto_flush) end end |
#evict_flush(codec) ⇒ Object
279 280 281 282 283 284 285 286 287 288 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 279 def evict_flush(codec) if codec.respond_to?(:auto_flush) codec.auto_flush else if (block = @eviction_block || @decode_block) codec.flush(&block) end # all else - can't do anything end end |
#evict_timeout(timeout) ⇒ Object
used to add a non-default evict timeout
153 154 155 156 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 153 def evict_timeout(timeout) @evict_timeout = timeout.to_i self end |
#eviction_block(block) ⇒ Object
used to add a non-default eviction block
166 167 168 169 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 166 def eviction_block(block) @eviction_block = block self end |
#eviction_timestamp_for(identity) ⇒ Object
304 305 306 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 304 def (identity) find_codec_value(identity).eviction_timeout end |
#flush(&block) ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 202 def flush(&block) all_codecs.each do |codec| #let ruby do its default args thing if block_given? codec.flush(&block) else if codec.respond_to?(:auto_flush) codec.auto_flush else #try this, no guarantees codec.flush end end end end |
#flush_mapped(listener) ⇒ Object
240 241 242 243 244 245 246 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 240 def flush_mapped(listener) listener_has_path = listener.respond_to?(:path) identity_map.each do |identity, compo| listener.path = identity if listener_has_path compo.codec.auto_flush(listener) end end |
#identity_count ⇒ Object
256 257 258 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 256 def identity_count identity_map.size end |
#logger ⇒ Object
294 295 296 297 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 294 def logger # we 'borrow' the codec's logger as we don't have our own @base_codec.logger end |
#map_cleanup ⇒ Object
support cleaning of stale stream/codecs a stream is considered stale if it has not been accessed in the last @evict_timeout period (default 1 hour)
264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 264 def map_cleanup if !identity_count.zero? nowi = Time.now.to_i # delete_if is atomic # contents should not mutate during this call identity_map.delete_if do |identity, compo| if (flag = compo.eviction_timeout <= nowi) evict_flush(compo.codec) end flag end end current_size_and_limit end |
#max_identities(max) ⇒ Object
Constructional/builder methods chain this method off of new
used to add a non-default maximum identities
147 148 149 150 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 147 def max_identities(max) @max_identities = max.to_i self end |
#max_limit ⇒ Object
252 253 254 |
# File 'lib/logstash/codecs/identity_map_codec.rb', line 252 def max_limit @max_identities end |