Class: Whispr
- Inherits:
-
Object
- Object
- Whispr
- Defined in:
- lib/whispr.rb,
lib/whispr/archive.rb,
lib/whispr/version.rb
Defined Under Namespace
Modules: Error Classes: Archive, ArchiveBoundaryExceeded, CorruptWhisprFile, InvalidAggregationMethod, InvalidConfiguration, InvalidTimeInterval, TimestampNotCovered, ValueError, WhisprError
Constant Summary collapse
- LONG_FMT =
"N"- METADATA_FMT =
"#{LONG_FMT*2}g#{LONG_FMT}"- METADATA_SIZE =
16- ARCHIVE_INFO_FMT =
LONG_FMT * 3
- ARCHIVE_INFO_SIZE =
12- POINT_FMT =
"#{LONG_FMT}G"- POINT_SIZE =
12- CHUNK_SIZE =
16384- AGGR_TYPES =
[ :_, :average, :sum, :last, :max, :min ].freeze
- VERSION =
'0.2.0'
Instance Attribute Summary collapse
-
#auto_flush ⇒ Object
(also: #auto_flush?)
Returns the value of attribute auto_flush.
-
#fh ⇒ File, StringIO
readonly
File handle of the whisper file.
Class Method Summary collapse
-
.create(path, archiveList, opts = {}) ⇒ Object
Create whipser file on the file system and prepopulate it.
- .parse_retention_def(rdef) ⇒ Object
-
.prepopulate(fh, archiveList, opts = {}) ⇒ Object
Build the header and reserve space for the archives in the Whispr file.
- .unitMultipliers ⇒ Object
-
.validArchiveList?(archiveList) ⇒ Boolean
Is the provided archive list valid?.
-
.validate_opts(opts = {}) ⇒ Hash
Set defaults for the options to #create and #prepopulate as well as validate the supplied options.
-
.validateArchiveList(archiveList) ⇒ NilClass, InvalidConfiguration
Validate an archive list without raising an exception.
-
.validateArchiveList!(archiveList) ⇒ nil
Validate an archive list An ArchiveList must: 1.
Instance Method Summary collapse
-
#archives ⇒ Array
Archives.
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#fetch(fromTime, untilTime = Time.new) ⇒ Object
Retrieve values from a whisper file within the given time window.
- #header ⇒ Hash (also: #info)
-
#initialize(file, auto_flush = true) ⇒ Whispr
constructor
A new instance of Whispr.
-
#update(*points) ⇒ Object
Update one or many points Each element of the points list should be a two dimensional Array where the first element is a timestamp and the second element is a value.
Constructor Details
#initialize(file, auto_flush = true) ⇒ Whispr
Returns a new instance of Whispr.
223 224 225 226 227 |
# File 'lib/whispr.rb', line 223 def initialize(file, auto_flush = true) @fh = file.is_a?(File) || file.is_a?(StringIO) ? file : File.open(file, 'r+') @fh.binmode @auto_flush = auto_flush end |
Instance Attribute Details
#auto_flush ⇒ Object Also known as: auto_flush?
Returns the value of attribute auto_flush.
220 221 222 |
# File 'lib/whispr.rb', line 220 def auto_flush @auto_flush end |
#fh ⇒ File, StringIO (readonly)
Returns file handle of the whisper file.
218 219 220 |
# File 'lib/whispr.rb', line 218 def fh @fh end |
Class Method Details
.create(path, archiveList, opts = {}) ⇒ Object
Create whipser file on the file system and prepopulate it.
88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/whispr.rb', line 88 def create(path, archiveList, opts = {}) validate_opts(opts) validateArchiveList!(archiveList) raise InvalidConfiguration.new("File #{path} already exists!") if File.exists?(path) && !opts[:overwrite] # if file exists it will be truncated File.open(path, "wb") do |fh| fh.flock(File::LOCK_EX) prepopulate(fh, archiveList, opts) end new(path) end |
.parse_retention_def(rdef) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/whispr.rb', line 55 def parse_retention_def(rdef) raise ArgumentError.new("precision and points must be separated by a ':'") unless rdef && rdef.include?(":") (precision, points) = rdef.strip.split(':') if precision.to_i.to_s == precision precision = precision.to_i * unitMultipliers['s'] else _, precision, unit = precision.split(/([\d]+)/) unit = 's' unless unit raise ValueError.new("Invalid precision specification unit #{unit}") unless unitMultipliers[unit[0]] precision = precision.to_i * unitMultipliers[unit[0]] end if points.to_i.to_s == points points = points.to_i else _, points, unit = points.split(/([\d]+)/) raise ValueError.new("Invalid retention specification unit #{unit}") unless unitMultipliers[unit[0]] points = points.to_i * unitMultipliers[unit[0]] / precision end [precision, points] end |
.prepopulate(fh, archiveList, opts = {}) ⇒ Object
Build the header and reserve space for the archives in the Whispr file.
You probably don’t want to use this method, you probably want to use #create instead. Calls to prepopulate MUST be preceeded by a call to validateArchiveList! with the archiveList argument.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/whispr.rb', line 128 def prepopulate(fh, archiveList, opts = {}) opts = validate_opts(opts) aggregationType = AGGR_TYPES.index(opts[:aggregationMethod]) oldest = archiveList.map{|spp, points| spp * points }.sort.last packedMetadata = [aggregationType, oldest, opts[:xff], archiveList.length].pack(METADATA_FMT) fh.write(packedMetadata) headerSize = METADATA_SIZE + (ARCHIVE_INFO_SIZE * archiveList.length) archiveOffsetPointer = headerSize archiveList.each do |spp, points| archiveInfo = [archiveOffsetPointer, spp, points].pack(ARCHIVE_INFO_FMT) fh.write(archiveInfo) archiveOffsetPointer += (points * POINT_SIZE) end if opts[:sparse] fh.seek(archiveOffsetPointer - headerSize - 1) fh.write("\0") else remaining = archiveOffsetPointer - headerSize zeroes = "\x00" * CHUNK_SIZE while remaining > CHUNK_SIZE fh.write(zeroes) remaining -= CHUNK_SIZE end fh.write(zeroes[0..remaining]) end fh.flush fh.fsync rescue nil fh end |
.unitMultipliers ⇒ Object
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/whispr.rb', line 44 def unitMultipliers @unitMultipliers ||= { 's' => 1, 'm' => 60, 'h' => 3600, 'd' => 86400, 'w' => 86400 * 7, 'y' => 86400 * 365 } end |
.validArchiveList?(archiveList) ⇒ Boolean
Is the provided archive list valid?
162 163 164 |
# File 'lib/whispr.rb', line 162 def validArchiveList?(archiveList) !(!!(validateArchiveList!(archiveList) rescue true)) end |
.validate_opts(opts = {}) ⇒ Hash
Set defaults for the options to #create and #prepopulate as well as validate the supplied options.
105 106 107 108 109 110 111 |
# File 'lib/whispr.rb', line 105 def validate_opts(opts = {}) opts = {:xff => 0.5, :aggregationMethod => :average, :sparse => false, :overwrite => false}.merge(opts) unless AGGR_TYPES[1..-1].include?(opts[:aggregationMethod]) raise InvalidConfiguration.new("aggregationMethod must be one of #{AGGR_TYPES[1..-1]}") end opts end |
.validateArchiveList(archiveList) ⇒ NilClass, InvalidConfiguration
Validate an archive list without raising an exception
168 169 170 |
# File 'lib/whispr.rb', line 168 def validateArchiveList(archiveList) validateArchiveList!(archiveList) rescue $! end |
.validateArchiveList!(archiveList) ⇒ nil
Validate an archive list An ArchiveList must:
-
Have at least one archive config. Example: [60, 86400]
-
No archive may be a duplicate of another.
-
Higher precision archives’ precision must evenly divide all lower precision archives’ precision.
-
Lower precision archives must cover larger time intervals than higher precision archives.
-
Each archive must have at least enough points to consolidate to the next archive
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/whispr.rb', line 181 def validateArchiveList!(archiveList) raise InvalidConfiguration.new("you must specify at least on archive configuration") if Array(archiveList).empty? archiveList = archiveList.sort{|a,b| a[0] <=> b[0] } archiveList[0..-2].each_with_index do |archive, i| nextArchive = archiveList[i+1] unless archive[0] < nextArchive[0] raise InvalidConfiguration.new("A Whipser database may not be configured " + "having two archives with the same precision " + "(archive#{i}: #{archive}, archive#{i+1}: #{nextArchive})") end unless nextArchive[0] % archive[0] == 0 raise InvalidConfiguration.new("Higher precision archives' precision must " + "evenly divide all lower precision archives' precision " + "(archive#{i}: #{archive}, archive#{i+1}: #{nextArchive})") end retention = archive[0] * archive[1] nextRetention = nextArchive[0] * nextArchive[1] unless nextRetention > retention raise InvalidConfiguration.new("Lower precision archives must cover larger " + "time intervals than higher precision archives " + "(archive#{i}: #{archive[1]}, archive#{i + 1}:, #{nextArchive[1]})") end archivePoints = archive[1] pointsPerConsolidation = nextArchive[0] / archive[0] unless archivePoints >= pointsPerConsolidation raise InvalidConfiguration.new("Each archive must have at least enough points " + "to consolidate to the next archive (archive#{i+1} consolidates #{pointsPerConsolidation} of " + "archive#{i}'s points but it has only #{archivePoints} total points)") end end nil end |
Instance Method Details
#archives ⇒ Array
Returns Archives.
237 238 239 |
# File 'lib/whispr.rb', line 237 def archives @archives ||= info[:archives].map { |a| Archive.new(self, a) } end |
#close ⇒ Object
289 290 291 |
# File 'lib/whispr.rb', line 289 def close @fh.close end |
#closed? ⇒ Boolean
285 286 287 |
# File 'lib/whispr.rb', line 285 def closed? @fh.closed? end |
#fetch(fromTime, untilTime = Time.new) ⇒ Object
Retrieve values from a whisper file within the given time window.
The most appropriate archive within the whisper file will be chosen. The return value will be a two element Array. The first element will be a three element array containing the start time, end time and step. The second element will be a N element array containing each value at each step period.
251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/whispr.rb', line 251 def fetch(fromTime, untilTime = Time.new) fromTime = fromTime.to_i untilTime = untilTime.to_i now = Time.now.to_i oldest = now - header[:maxRetention] fromTime = oldest if fromTime < oldest raise InvalidTimeInterval.new("Invalid time interval (untilTime=#{untilTime}, fromTime=#{fromTime})") unless fromTime < untilTime untilTime = now if untilTime > now || untilTime < fromTime diff = now - fromTime archive = archives.find{|a| a.retention >= diff } return archive.fetch(fromTime, untilTime) end |
#header ⇒ Hash Also known as: info
230 231 232 |
# File 'lib/whispr.rb', line 230 def header @header ||= read_header end |
#update(*points) ⇒ Object
Update one or many points Each element of the points list should be a two dimensional Array where the first element is a timestamp and the second element is a value.
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/whispr.rb', line 268 def update(*points) if points[0].is_a?(Array) # Cover the least exhaustive, and most likely, nested array check first points = points.length == 1 ? points[0] : points.flatten elsif points.any? { |p| p.is_a?(Array) } points = points.flatten end return if points.empty? || points.length % 2 != 0 # TODO lock the file if points.length == 2 update_one(points[1], points[0]) else update_many(points) end end |