Class: Fluent::Plugin::GeoipFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_geoip.rb

Constant Summary collapse

DEFAULT_CACHE_SIZE =

Default cache size (number of IP addresses to cache)

8192
DEFAULT_CACHE_TTL =

1 hour in seconds

3600
DEFAULT_DB_FILENAME =
'GeoLite2-City.mmdb'

Instance Method Summary collapse

Constructor Details

#initializeGeoipFilter

Returns a new instance of GeoipFilter.



46
47
48
49
50
51
# File 'lib/fluent/plugin/filter_geoip.rb', line 46

def initialize
  super
  @geoip = nil
  @ip_accessor = nil
  @database_data = nil
end

Instance Method Details

#configure(conf) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/filter_geoip.rb', line 53

def configure(conf)
  super

  start_time = Time.now
  @database_path = resolve_database_path

  # Initialize MaxMindDB
  begin
    if @memory_cache
      @database_data = load_database(@database_path)
      @geoip = MaxMindDB.new(StringIO.new(@database_data))
    else
      @geoip = MaxMindDB.new(decompress_if_needed(@database_path))
    end

    load_time = Time.now - start_time
    log.info 'Loaded GeoIP database',
             database: @database_path,
             size: format_size(@database_data ? @database_data.size : File.size(@database_path)),
             compressed_size: format_size(File.size(@database_path)),
             load_time: format_time(load_time)
  rescue StandardError => e
    raise Fluent::ConfigError, "Failed to load GeoIP database: #{e.message}"
  end

  # Initialize IP field accessor
  @ip_accessor = record_accessor_create(@key_name)

  # Initialize cache with TTL support
  @geoip_cache = LruRedux::TTL::Cache.new(@cache_size, @cache_ttl)

  log.info 'Initialized GeoIP filter',
           database: @database_path,
           cache_size: @cache_size,
           cache_ttl: @cache_ttl,
           memory_cache: @memory_cache
end

#filter(tag, time, record) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/filter_geoip.rb', line 96

def filter(tag, time, record)
  ip_addr = @ip_accessor.call(record)
  return record if ip_addr.nil? || ip_addr.empty? || ip_addr == '-'

  begin
    ip = IPAddr.new(ip_addr)
    return record if @skip_private_ip && ip.private?
  rescue IPAddr::InvalidAddressError => e
    log.debug 'Invalid IP address', ip: ip_addr, error: e.message
    return record
  end

  geo_ip = @geoip_cache.getset(ip_addr) { get_geoip(ip_addr) }
  return record if geo_ip.empty?

  if @flatten
    record.merge!(hash_flatten(geo_ip, [@out_key]))
  else
    record[@out_key] = geo_ip
  end

  record
rescue StandardError => e
  log.error 'Failed to process GeoIP lookup', error_class: e.class, error: e.message, tag: tag, time: time
  record
end

#shutdownObject



91
92
93
94
# File 'lib/fluent/plugin/filter_geoip.rb', line 91

def shutdown
  super
  @temp_db.unlink if @temp_db && File.exist?(@temp_db.path)
end