Class: LogStash::Codecs::IdentityMapCodec

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/codecs/identity_map_codec.rb

Defined Under Namespace

Modules: EightyPercentWarning, UpperLimitReached Classes: CodecValue, IdentityMapUpperLimitException, MapCleaner

Constant Summary collapse

MAX_IDENTITIES =

maximum size of the mapping hash

20_000
EVICT_TIMEOUT =

time after which a stream is considered stale each time a stream is accessed it is given a new timeout

60 * 60 * 1
CLEANER_INTERVAL =

time that the cleaner thread sleeps for before it tries to clean out stale mappings

60 * 5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(codec) ⇒ IdentityMapCodec

Returns a new instance of IdentityMapCodec.



97
98
99
100
101
102
103
104
105
106
# File 'lib/logstash/codecs/identity_map_codec.rb', line 97

def initialize(codec)
  @base_codec = codec
  @base_codecs = [codec]
  @identity_map = ThreadSafe::Hash.new &method(:codec_builder)
  @max_identities = MAX_IDENTITIES
  @evict_timeout = EVICT_TIMEOUT
  @cleaner = MapCleaner.new(self, CLEANER_INTERVAL)
  @decode_block = lambda {|*| true }
  @eviction_block = nil
end

Instance Attribute Details

#base_codecObject

Returns the value of attribute base_codec.



95
96
97
# File 'lib/logstash/codecs/identity_map_codec.rb', line 95

def base_codec
  @base_codec
end

#cleanerObject

Returns the value of attribute cleaner.



95
96
97
# File 'lib/logstash/codecs/identity_map_codec.rb', line 95

def cleaner
  @cleaner
end

#identity_mapObject (readonly)

5 minutes



94
95
96
# File 'lib/logstash/codecs/identity_map_codec.rb', line 94

def identity_map
  @identity_map
end

Instance Method Details

#accept(listener) ⇒ Object



157
158
159
# File 'lib/logstash/codecs/identity_map_codec.rb', line 157

def accept(listener)
  stream_codec(listener.path).accept(listener)
end

#all_codecsObject

end Codec API



190
191
192
# File 'lib/logstash/codecs/identity_map_codec.rb', line 190

def all_codecs
  no_streams? ? @base_codecs : identity_map.values.map(&:codec)
end

#cleaner_interval(interval) ⇒ Object

used to add a non-default cleaner interval



125
126
127
128
129
# File 'lib/logstash/codecs/identity_map_codec.rb', line 125

def cleaner_interval(interval)
  @cleaner.stop
  @cleaner = MapCleaner.new(self, interval.to_i)
  self
end

#closeObject



183
184
185
186
# File 'lib/logstash/codecs/identity_map_codec.rb', line 183

def close()
  cleaner.stop
  all_codecs.each(&:close)
end

#codec_without_usage_update(identity) ⇒ Object



239
240
241
# File 'lib/logstash/codecs/identity_map_codec.rb', line 239

def codec_without_usage_update(identity)
  find_codec_value(identity).codec
end

#current_size_and_limitObject



230
231
232
# File 'lib/logstash/codecs/identity_map_codec.rb', line 230

def current_size_and_limit
  [identity_count, max_limit]
end

#decode(data, identity = nil, &block) ⇒ Object Also known as: <<

Codec API



152
153
154
155
# File 'lib/logstash/codecs/identity_map_codec.rb', line 152

def decode(data, identity = nil, &block)
  @decode_block = block if @decode_block != block
  stream_codec(identity).decode(data, &block)
end

#encode(event, identity = nil) ⇒ Object



163
164
165
# File 'lib/logstash/codecs/identity_map_codec.rb', line 163

def encode(event, identity = nil)
  stream_codec(identity).encode(event)
end

#evict(identity) ⇒ Object

IdentityMapCodec API



141
142
143
144
145
146
# File 'lib/logstash/codecs/identity_map_codec.rb', line 141

def evict(identity)
  # maybe called more than once
  if (compo = identity_map.delete(identity))
    compo.codec.auto_flush if compo.codec.respond_to?(:auto_flush)
  end
end

#evict_flush(codec) ⇒ Object



219
220
221
222
223
224
225
226
227
228
# File 'lib/logstash/codecs/identity_map_codec.rb', line 219

def evict_flush(codec)
  if codec.respond_to?(:auto_flush)
    codec.auto_flush
  else
    if (block = @eviction_block || @decode_block)
      codec.flush(&block)
    end
    # all else - can't do anything
  end
end

#evict_timeout(timeout) ⇒ Object

used to add a non-default evict timeout



119
120
121
122
# File 'lib/logstash/codecs/identity_map_codec.rb', line 119

def evict_timeout(timeout)
  @evict_timeout = timeout.to_i
  self
end

#eviction_block(block) ⇒ Object

used to add a non-default eviction block



132
133
134
135
# File 'lib/logstash/codecs/identity_map_codec.rb', line 132

def eviction_block(block)
  @eviction_block = block
  self
end

#eviction_timestamp_for(identity) ⇒ Object



243
244
245
# File 'lib/logstash/codecs/identity_map_codec.rb', line 243

def eviction_timestamp_for(identity)
  find_codec_value(identity).timeout
end

#flush(&block) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/logstash/codecs/identity_map_codec.rb', line 167

def flush(&block)
  all_codecs.each do |codec|
    #let ruby do its default args thing
    if block_given?
      codec.flush(&block)
    else
      if codec.respond_to?(:auto_flush)
        codec.auto_flush
      else
        #try this, no guarantees
        codec.flush
      end
    end
  end
end

#identity_countObject



198
199
200
# File 'lib/logstash/codecs/identity_map_codec.rb', line 198

def identity_count
  identity_map.size
end

#loggerObject



234
235
236
237
# File 'lib/logstash/codecs/identity_map_codec.rb', line 234

def logger
  # we 'borrow' the codec's logger as we don't have our own
  @base_codec.logger
end

#map_cleanupObject

support cleaning of stale stream/codecs a stream is considered stale if it has not been accessed in the last @evict_timeout period (default 1 hour)



206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/logstash/codecs/identity_map_codec.rb', line 206

def map_cleanup
  cut_off = Time.now.to_i
  # delete_if is atomic
  # contents should not mutate during this call
  identity_map.delete_if do |identity, compo|
    if (flag = compo.timeout <= cut_off)
      evict_flush(compo.codec)
    end
    flag
  end
  current_size_and_limit
end

#max_identities(max) ⇒ Object

Constructional/builder methods chain this method off of new

used to add a non-default maximum identities



113
114
115
116
# File 'lib/logstash/codecs/identity_map_codec.rb', line 113

def max_identities(max)
  @max_identities = max.to_i
  self
end

#max_limitObject



194
195
196
# File 'lib/logstash/codecs/identity_map_codec.rb', line 194

def max_limit
  @max_identities
end