Class: AsyncResolverWithCache

Inherits:
Object
  • Object
show all
Defined in:
lib/AsyncResolverWithCache.rb

Overview

This class implements a fairly generic asynchronous resolver with persistent cache. It assumes the information is IP address related, but it’s really up to the subclassing instance to interpret this. The resolver is multi-threaded and can store the cache in a file.

Direct Known Subclasses

DNSResolver, GeoLocator, TraceRouter

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cacheName, maxAgeInSeconds, maxThreads = 1) ⇒ AsyncResolverWithCache

Returns a new instance of AsyncResolverWithCache.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/AsyncResolverWithCache.rb', line 24

def initialize(cacheName, maxAgeInSeconds, maxThreads = 1)

  @cacheName = cacheName
  @maxAgeInSeconds = maxAgeInSeconds
  @enabled = true

  # This buffer stores the resolved records. They are hashed by their
  # IP address.
  @records = {}
  # @records can be accessed by the front-end functions as well as the
  # back-end threads. So we need to lock to avoid access collisions.
  @recordLock = Mutex.new()

  # The queue stores incoming requests for IPs to be resolved.
  @queue = Set.new
  # To store IPs while the resolver threads resolve them.
  @wip = Set.new
  # @queue and @wip need to be protected against access collisions.
  @queueLock = Mutex.new()

  @pause = 1
  @nextPurge = Time.now

  # Restore already resolved records from persistent storage module.
  @records = $globals.getSetting(@cacheName)

  # This flag is set to true to terminate the resolver threads.
  @termFlag = false

  # Start the requested number of resolver threads to service incoming
  # requests.
  @threads = []
  1.upto(maxThreads) { |id| @threads << Thread.new() { processQueueT(id) } }
end

Instance Attribute Details

#enabledObject

Returns the value of attribute enabled.



22
23
24
# File 'lib/AsyncResolverWithCache.rb', line 22

def enabled
  @enabled
end

Instance Method Details

#cacheSizeObject

Returns the number of stored (resolved) records in the database.



128
129
130
131
132
133
134
# File 'lib/AsyncResolverWithCache.rb', line 128

def cacheSize
  @recordLock.lock()
  size = @records.size
  @recordLock.unlock()

  size
end

#expireInMinutes(ip, minutes) ⇒ Object

This function can be used to adjust the expiration time of the record corresponding to the ip address. The record will expire ‘minutes’ from now.



139
140
141
142
143
# File 'lib/AsyncResolverWithCache.rb', line 139

def expireInMinutes(ip, minutes)
  return unless rec = @records[ip]

  rec.timeStamp = Time.now + @maxAgeInSeconds - minutes * 60
end

#flushQueueObject

Flushes the waiting queue.



121
122
123
124
125
# File 'lib/AsyncResolverWithCache.rb', line 121

def flushQueue
  @queueLock.lock()
  @queue.clear
  @queueLock.unlock()
end

#queueSizeObject

Returns the size of the waiting queue.



112
113
114
115
116
117
118
# File 'lib/AsyncResolverWithCache.rb', line 112

def queueSize
  @queueLock.lock()
  size = @queue.size
  @queueLock.unlock()

  size
end

#resolve(ip) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/AsyncResolverWithCache.rb', line 74

def resolve(ip)
  @recordLock.lock()

  # Before we resolve new IPs we try to purge outdated ones.
  if Time.now > @nextPurge
    deadline = Time.now - @maxAgeInSeconds
    @records.delete_if { |dummy, r| r.timeStamp < deadline }
    # We purge every 10 minutes
    @nextPurge = Time.now + 60 * 10
  end

  if @records.has_key?(ip)
    # The record is already in the cache, so we can resolve it immediately.
    locRec = @records[ip]
    @recordLock.unlock()
    return locRec
  else
    # The record is not yet known.
    @recordLock.unlock()

    # In case the resolver is disabled, we don't start any new queries
    # anymore.
    return nil unless @enabled

    # There is no point in queuing up endless numbers of requests. They will
    # be re-requested in case the information is still needed. So we can
    # safely discard requests when the queue is filled enough.
    # Otherwise push request on waiting queue if it's not already in there.
    @queueLock.lock()
    @queue.add(ip) unless @queue.size >= 200
    @queueLock.unlock

    # Return nil since we don't have a better answer yet.
    return nil
  end
end

#stopAndSaveObject

Stop the resolver threads and save records to disk.



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/AsyncResolverWithCache.rb', line 60

def stopAndSave
  # Signal threads to terminate
  @termFlag = true

  # Wait until all threads have terminated
  # Update: Can't do this. It blocks the GUI for too long
  # @threads.each { |t| t.join }

  # Save cached records to disk
  @recordLock.lock()
  $globals.setSetting(@cacheName, @records)
  @recordLock.unlock()
end