Class: Kscript::KkApnicIpUtils

Inherits:
Base
  • Object
show all
Defined in:
lib/kscript/plugins/kk_apnic_ip_utils.rb

Instance Attribute Summary collapse

Attributes inherited from Base

#logger

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#human_output?, inherited, #with_error_handling

Constructor Details

#initialize(country_sn = 'CN', *args, **opts) ⇒ KkApnicIpUtils

Initialize class instance, set country code and cache file path



15
16
17
18
19
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 15

def initialize(country_sn = 'CN', *args, **opts)
  super(*args, **opts)
  @country_sn = country_sn || 'CN'
  @cache_file = RUBY_PLATFORM.match?(/(linux|darwin)/) ? '/tmp/apnic.txt' : 'apnic.txt'
end

Instance Attribute Details

#cache_fileObject (readonly)

Returns the value of attribute cache_file.



12
13
14
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 12

def cache_file
  @cache_file
end

#country_snObject (readonly)

Returns the value of attribute country_sn.



12
13
14
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 12

def country_sn
  @country_sn
end

Class Method Details

.argumentsObject



73
74
75
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 73

def self.arguments
  '[country_code]'
end

.authorObject



85
86
87
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 85

def self.author
  'kk'
end

.descriptionObject



89
90
91
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 89

def self.description
  'Get APNIC IPv4 ranges for a country.'
end

.groupObject



81
82
83
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 81

def self.group
  'network'
end

.usageObject



77
78
79
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 77

def self.usage
  "kscript apnic_ip CN\nkscript apnic_ip US"
end

Instance Method Details

#calculate_netmask(hosts) ⇒ Object

Calculate CIDR netmask based on number of hosts



61
62
63
64
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 61

def calculate_netmask(hosts)
  # Calculate minimum CIDR netmask
  32 - Math.log2(hosts).to_i
end

#download_dataObject

Download data from APNIC or read from cache



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 22

def download_data
  if File.exist?(cache_file) && File.size?(cache_file)
    mtime = File.mtime(cache_file)
    if Time.now - mtime < 86_400
      logger.kinfo("Using cached data from #{cache_file} (updated #{mtime})")
      return
    else
      logger.kinfo("Cache expired (last updated #{mtime}), downloading new data...")
    end
  end
  url = 'https://ftp.apnic.net/stats/apnic/delegated-apnic-latest'
  response = HTTPX.get(url)
  response = response.first if response.is_a?(Array)
  raise "Failed to download the APNIC data. HTTP Status: #{response.status}" unless response.status == 200

  File.write(cache_file, response.body.to_s)
  logger.kinfo("Data downloaded and saved to #{cache_file}")
end

#parse_ip_rangesObject

Parse data and return IPv4 address ranges (CIDR format) for specified country



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 42

def parse_ip_ranges
  download_data # Ensure data is downloaded first

  pattern = /apnic\|#{country_sn}\|ipv4\|(?<ip>\d+\.\d+\.\d+\.\d+)\|(?<hosts>\d+)\|\d+\|allocated/mi
  ip_ranges = []

  File.readlines(cache_file).each do |line|
    next unless line.match(pattern)

    val = line.match(pattern)
    netmask = calculate_netmask(val[:hosts].to_i)
    ip_ranges << "#{val[:ip]}/#{netmask}"
  end

  logger.kinfo('IP ranges', ip_ranges: ip_ranges)
  ip_ranges
end

#run(*args, **_opts) ⇒ Object



66
67
68
69
70
71
# File 'lib/kscript/plugins/kk_apnic_ip_utils.rb', line 66

def run(*args, **_opts)
  with_error_handling do
    @country_sn = args[0] if args[0]
    parse_ip_ranges
  end
end