Class: AsyncResolverWithCache
- Inherits:
-
Object
- Object
- AsyncResolverWithCache
- Defined in:
- lib/AsyncResolverWithCache.rb
Overview
This class implements a fairly generic asynchronous resolver with persistent cache. It assumes the information is IP address related, but it’s really up to the subclassing instance to interpret this. The resolver is multi-threaded and can store the cache in a file.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#enabled ⇒ Object
Returns the value of attribute enabled.
Instance Method Summary collapse
-
#cacheSize ⇒ Object
Returns the number of stored (resolved) records in the database.
-
#expireInMinutes(ip, minutes) ⇒ Object
This function can be used to adjust the expiration time of the record corresponding to the ip address.
-
#flushQueue ⇒ Object
Flushes the waiting queue.
-
#initialize(cacheName, maxAgeInSeconds, maxThreads = 1) ⇒ AsyncResolverWithCache
constructor
A new instance of AsyncResolverWithCache.
-
#queueSize ⇒ Object
Returns the size of the waiting queue.
- #resolve(ip) ⇒ Object
-
#stopAndSave ⇒ Object
Stop the resolver threads and save records to disk.
Constructor Details
#initialize(cacheName, maxAgeInSeconds, maxThreads = 1) ⇒ AsyncResolverWithCache
Returns a new instance of AsyncResolverWithCache.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/AsyncResolverWithCache.rb', line 24 def initialize(cacheName, maxAgeInSeconds, maxThreads = 1) @cacheName = cacheName @maxAgeInSeconds = maxAgeInSeconds @enabled = true # This buffer stores the resolved records. They are hashed by their # IP address. @records = {} # @records can be accessed by the front-end functions as well as the # back-end threads. So we need to lock to avoid access collisions. @recordLock = Mutex.new() # The queue stores incoming requests for IPs to be resolved. @queue = Set.new # To store IPs while the resolver threads resolve them. @wip = Set.new # @queue and @wip need to be protected against access collisions. @queueLock = Mutex.new() @pause = 1 @nextPurge = Time.now # Restore already resolved records from persistent storage module. @records = $globals.getSetting(@cacheName) # This flag is set to true to terminate the resolver threads. @termFlag = false # Start the requested number of resolver threads to service incoming # requests. @threads = [] 1.upto(maxThreads) { |id| @threads << Thread.new() { processQueueT(id) } } end |
Instance Attribute Details
#enabled ⇒ Object
Returns the value of attribute enabled.
22 23 24 |
# File 'lib/AsyncResolverWithCache.rb', line 22 def enabled @enabled end |
Instance Method Details
#cacheSize ⇒ Object
Returns the number of stored (resolved) records in the database.
128 129 130 131 132 133 134 |
# File 'lib/AsyncResolverWithCache.rb', line 128 def cacheSize @recordLock.lock() size = @records.size @recordLock.unlock() size end |
#expireInMinutes(ip, minutes) ⇒ Object
This function can be used to adjust the expiration time of the record corresponding to the ip address. The record will expire ‘minutes’ from now.
139 140 141 142 143 |
# File 'lib/AsyncResolverWithCache.rb', line 139 def expireInMinutes(ip, minutes) return unless rec = @records[ip] rec.timeStamp = Time.now + @maxAgeInSeconds - minutes * 60 end |
#flushQueue ⇒ Object
Flushes the waiting queue.
121 122 123 124 125 |
# File 'lib/AsyncResolverWithCache.rb', line 121 def flushQueue @queueLock.lock() @queue.clear @queueLock.unlock() end |
#queueSize ⇒ Object
Returns the size of the waiting queue.
112 113 114 115 116 117 118 |
# File 'lib/AsyncResolverWithCache.rb', line 112 def queueSize @queueLock.lock() size = @queue.size @queueLock.unlock() size end |
#resolve(ip) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/AsyncResolverWithCache.rb', line 74 def resolve(ip) @recordLock.lock() # Before we resolve new IPs we try to purge outdated ones. if Time.now > @nextPurge deadline = Time.now - @maxAgeInSeconds @records.delete_if { |dummy, r| r.timeStamp < deadline } # We purge every 10 minutes @nextPurge = Time.now + 60 * 10 end if @records.has_key?(ip) # The record is already in the cache, so we can resolve it immediately. locRec = @records[ip] @recordLock.unlock() return locRec else # The record is not yet known. @recordLock.unlock() # In case the resolver is disabled, we don't start any new queries # anymore. return nil unless @enabled # There is no point in queuing up endless numbers of requests. They will # be re-requested in case the information is still needed. So we can # safely discard requests when the queue is filled enough. # Otherwise push request on waiting queue if it's not already in there. @queueLock.lock() @queue.add(ip) unless @queue.size >= 200 @queueLock.unlock # Return nil since we don't have a better answer yet. return nil end end |
#stopAndSave ⇒ Object
Stop the resolver threads and save records to disk.
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/AsyncResolverWithCache.rb', line 60 def stopAndSave # Signal threads to terminate @termFlag = true # Wait until all threads have terminated # Update: Can't do this. It blocks the GUI for too long # @threads.each { |t| t.join } # Save cached records to disk @recordLock.lock() $globals.setSetting(@cacheName, @records) @recordLock.unlock() end |