Class: Fluent::Plugin::GeoipFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::Plugin::GeoipFilter
- Defined in:
- lib/fluent/plugin/filter_geoip.rb
Constant Summary collapse
- DEFAULT_CACHE_SIZE =
Default cache size (number of IP addresses to cache)
8192- DEFAULT_CACHE_TTL =
1 hour in seconds
3600
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #filter(tag, time, record) ⇒ Object
-
#initialize ⇒ GeoipFilter
constructor
A new instance of GeoipFilter.
Constructor Details
#initialize ⇒ GeoipFilter
Returns a new instance of GeoipFilter.
40 41 42 43 44 |
# File 'lib/fluent/plugin/filter_geoip.rb', line 40 def initialize super @geoip = nil @ip_accessor = nil end |
Instance Method Details
#configure(conf) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/fluent/plugin/filter_geoip.rb', line 46 def configure(conf) super unless File.exist?(@database_path) raise Fluent::ConfigError, "GeoIP database file '#{@database_path}' does not exist" end # Initialize MaxMindDB begin @geoip = MaxMindDB.new(@database_path) rescue StandardError => e raise Fluent::ConfigError, "Failed to load GeoIP database: #{e.message}" end # Initialize IP field accessor @ip_accessor = record_accessor_create(@key_name) # Initialize cache with TTL support @geoip_cache = LruRedux::TTL::Cache.new(@cache_size, @cache_ttl) log.info "Initialized GeoIP filter", database: @database_path, cache_size: @cache_size, cache_ttl: @cache_ttl end |
#filter(tag, time, record) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fluent/plugin/filter_geoip.rb', line 69 def filter(tag, time, record) ip_addr = @ip_accessor.call(record) return record if ip_addr.nil? || ip_addr.empty? || ip_addr == '-' begin ip = IPAddr.new(ip_addr) return record if @skip_private_ip && ip.private? rescue IPAddr::InvalidAddressError => e log.debug "Invalid IP address", ip: ip_addr, error: e. return record end geo_ip = @geoip_cache.getset(ip_addr) { get_geoip(ip_addr) } return record if geo_ip.empty? if @flatten record.merge!(hash_flatten(geo_ip, [@out_key])) else record[@out_key] = geo_ip end record rescue StandardError => e log.error "Failed to process GeoIP lookup", error_class: e.class, error: e., tag: tag, time: time record end |