Class: Fluent::Plugin::GeoipFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_geoip.rb

Constant Summary collapse

DEFAULT_CACHE_SIZE =

Default cache size (number of IP addresses to cache)

8192
DEFAULT_CACHE_TTL =

1 hour in seconds

3600

Instance Method Summary collapse

Constructor Details

#initializeGeoipFilter

Returns a new instance of GeoipFilter.



40
41
42
43
44
# File 'lib/fluent/plugin/filter_geoip.rb', line 40

def initialize
  super
  @geoip = nil
  @ip_accessor = nil
end

Instance Method Details

#configure(conf) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/filter_geoip.rb', line 46

def configure(conf)
  super

  unless File.exist?(@database_path)
    raise Fluent::ConfigError, "GeoIP database file '#{@database_path}' does not exist"
  end

  # Initialize MaxMindDB
  begin
    @geoip = MaxMindDB.new(@database_path)
  rescue StandardError => e
    raise Fluent::ConfigError, "Failed to load GeoIP database: #{e.message}"
  end

  # Initialize IP field accessor
  @ip_accessor = record_accessor_create(@key_name)

  # Initialize cache with TTL support
  @geoip_cache = LruRedux::TTL::Cache.new(@cache_size, @cache_ttl)

  log.info "Initialized GeoIP filter", database: @database_path, cache_size: @cache_size, cache_ttl: @cache_ttl
end

#filter(tag, time, record) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/filter_geoip.rb', line 69

def filter(tag, time, record)
  ip_addr = @ip_accessor.call(record)
  return record if ip_addr.nil? || ip_addr.empty? || ip_addr == '-'

  begin
    ip = IPAddr.new(ip_addr)
    return record if @skip_private_ip && ip.private?
  rescue IPAddr::InvalidAddressError => e
    log.debug "Invalid IP address", ip: ip_addr, error: e.message
    return record
  end

  geo_ip = @geoip_cache.getset(ip_addr) { get_geoip(ip_addr) }
  return record if geo_ip.empty?

  if @flatten
    record.merge!(hash_flatten(geo_ip, [@out_key]))
  else
    record[@out_key] = geo_ip
  end

  record
rescue StandardError => e
  log.error "Failed to process GeoIP lookup", error_class: e.class, error: e.message, tag: tag, time: time
  record
end