Class: LogStash::Filters::GeoIP

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/filters/geoip.rb

Overview

The GeoIP filter adds information about the geographical location of IP addresses, based on data from the Maxmind database.

Starting with version 1.3.0 of Logstash, a ‘[geoip]` field is created if the GeoIP lookup returns a latitude and longitude. The field is stored in geojson.org/geojson-spec.html[GeoJSON] format. Additionally, the default Elasticsearch template provided with the <<plugins-outputs-elasticsearch,`elasticsearch` output>> maps the `[geoip]` field to an www.elastic.co/guide/en/elasticsearch/reference/1.7/mapping-geo-point-type.html#_mapping_options[Elasticsearch geo_point].

As this field is a ‘geo_point` and it is still valid GeoJSON, you get the awesomeness of Elasticsearch’s geospatial query, facet and filter functions and the flexibility of having GeoJSON for all other applications (like Kibana’s map visualization).

Logstash releases ship with the GeoLiteCity database made available from Maxmind with a CCA-ShareAlike 3.0 license. For more details on GeoLite, see <www.maxmind.com/en/geolite>.

Constant Summary collapse

LOOKUP_CACHE_INIT_MUTEX =
Mutex.new
LOOKUP_CACHES =

Map of lookup caches, keyed by geoip_type

{}

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#lookup_cacheObject

Returns the value of attribute lookup_cache.



30
31
32
# File 'lib/logstash/filters/geoip.rb', line 30

def lookup_cache
  @lookup_cache
end

#threadkeyObject (readonly)

Returns the value of attribute threadkey.



31
32
33
# File 'lib/logstash/filters/geoip.rb', line 31

def threadkey
  @threadkey
end

Instance Method Details

#apply_geodata(geo_data_hash, event) ⇒ Object

def filter



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/logstash/filters/geoip.rb', line 135

def apply_geodata(geo_data_hash, event)
  # don't do anything more if the lookup result is nil?
  return false if geo_data_hash.nil?
  # only set the event[@target] if the lookup result is not nil: BWC
  event[@target] = {} if event[@target].nil?
  # don't do anything more if the lookup result is empty?
  return false if geo_data_hash.empty?
  geo_data_hash.each do |key, value|
    if @no_fields || @fields.include?(key)
      # can't dup numerics
      event["[#{@target}][#{key}]"] = value.is_a?(Numeric) ? value : value.dup
    end
  end # geo_data_hash.each
  true
end

#ensure_database!Object



212
213
214
215
216
217
218
219
# File 'lib/logstash/filters/geoip.rb', line 212

def ensure_database!
  # Use thread-local access to GeoIP. The Ruby GeoIP module forces a mutex
  # around access to the database, which can be overcome with :pread.
  # Unfortunately, :pread requires the io-extra gem, with C extensions that
  # aren't supported on JRuby. If / when :pread becomes available, we can stop
  # needing thread-local access.
  Thread.current[threadkey] ||= ::GeoIP.new(@database)
end

#filter(event) ⇒ Object



128
129
130
131
132
133
# File 'lib/logstash/filters/geoip.rb', line 128

def filter(event)
  geo_data_hash = get_geo_data(event)
  if apply_geodata(geo_data_hash, event)
    filter_matched(event)
  end
end

#get_geo_data(event) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/logstash/filters/geoip.rb', line 151

def get_geo_data(event)
  # pure function, must control return value
  result = {}
  ip = event[@source]
  ip = ip.first if ip.is_a? Array
  return nil if ip.nil?
  begin
    result = get_geo_data_for_ip(ip)
  rescue SocketError => e
    @logger.error("IP Field contained invalid IP address or hostname", :field => @source, :event => event)
  rescue StandardError => e
    @logger.error("Unknown error while looking up GeoIP data", :exception => e, :field => @source, :event => event)
  end
  result
end

#get_geo_data_for_ip(ip) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/logstash/filters/geoip.rb', line 167

def get_geo_data_for_ip(ip)
  ensure_database!
  if (cached = lookup_cache[ip])
    cached
  else
    geo_data = Thread.current[threadkey].send(@geoip_type, ip)
    converted = prepare_geodata_for_cache(geo_data)
    lookup_cache[ip] = converted
    converted
  end
end

#prepare_geodata_for_cache(geo_data) ⇒ Object



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/logstash/filters/geoip.rb', line 179

def prepare_geodata_for_cache(geo_data)
  # GeoIP returns a nil or a Struct subclass
  return nil if !geo_data.respond_to?(:each_pair)
  #lets just do this once before caching
  result = {}
  geo_data.each_pair do |k, v|
    next if v.nil? || k == :request
    if v.is_a?(String)
      next if v.empty?
      # Some strings from GeoIP don't have the correct encoding...
      result[k.to_s] = case v.encoding
        # I have found strings coming from GeoIP that are ASCII-8BIT are actually
        # ISO-8859-1...
      when Encoding::ASCII_8BIT
        v.force_encoding(Encoding::ISO_8859_1).encode(Encoding::UTF_8)
      when Encoding::ISO_8859_1, Encoding::US_ASCII
        v.encode(Encoding::UTF_8)
      else
        v
      end
    else
      result[k.to_s] = v
    end
  end

  lat, lng = result.values_at("latitude", "longitude")
  if lat && lng
    result["location"] = [ lng.to_f, lat.to_f ]
  end

  result
end

#registerObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/logstash/filters/geoip.rb', line 88

def register
  require "geoip"

  if @database.nil?
    @database = ::Dir.glob(::File.join(::File.expand_path("../../../vendor/", ::File.dirname(__FILE__)),"GeoLiteCity*.dat")).first
    if !File.exists?(@database)
      raise "You must specify 'database => ...' in your geoip filter (I looked for '#{@database}'"
    end
  end
  @logger.info("Using geoip database", :path => @database)
  # For the purpose of initializing this filter, geoip is initialized here but
  # not set as a global. The geoip module imposes a mutex, so the filter needs
  # to re-initialize this later in the filter() thread, and save that access
  # as a thread-local variable.
  geoip_initialize = ::GeoIP.new(@database)

  @geoip_type = case geoip_initialize.database_type
  when GeoIP::GEOIP_CITY_EDITION_REV0, GeoIP::GEOIP_CITY_EDITION_REV1
    :city
  when GeoIP::GEOIP_COUNTRY_EDITION
    :country
  when GeoIP::GEOIP_ASNUM_EDITION
    :asn
  when GeoIP::GEOIP_ISP_EDITION, GeoIP::GEOIP_ORG_EDITION
    :isp
  else
    raise RuntimeException.new "This GeoIP database is not currently supported"
  end

  @threadkey = "geoip-#{self.object_id}"

  # This is wrapped in a mutex to make sure the initialization behavior of LOOKUP_CACHES (see def above) doesn't create a dupe
  LOOKUP_CACHE_INIT_MUTEX.synchronize do
    self.lookup_cache = LOOKUP_CACHES[@geoip_type] ||= LruRedux::ThreadSafeCache.new(1000)
  end

  @no_fields = @fields.nil? || @fields.empty?
end