Class: Whispr::Archive
- Inherits:
-
Object
- Object
- Whispr::Archive
- Includes:
- Enumerable
- Defined in:
- lib/whispr/archive.rb
Instance Attribute Summary collapse
-
#header ⇒ Hash
readonly
The archive header.
-
#offset ⇒ Fixnum
readonly
The start location in the whisper file of this Archive.
-
#points ⇒ Fixnum
readonly
The number of points in this archive.
-
#retention ⇒ Fixnum
readonly
Number of seconds worth of data retained by this archive.
-
#size ⇒ Fixnum
readonly
The total size of this archive (points * POINT_SIZE).
-
#spp ⇒ Fixnum
readonly
Seconds per point.
-
#whisper ⇒ Whispr
readonly
The Whisper that contains this Archive.
Instance Method Summary collapse
-
#each(&blk) ⇒ Object
Retrieve each point from the archive.
-
#eoa? ⇒ Boolean
Has the end of the archive been reached?.
-
#fetch(fromTime, untilTime) ⇒ Object
Retrieve values for a time period from an archive within a whisper file.
-
#initialize(whisper, header) ⇒ Archive
constructor
A new instance of Archive.
-
#next_point ⇒ Object
private
Retrieve the next point from the whisper file.
-
#slurp ⇒ Array
Retrieve all points for this archive from the whisper file.
- #to_enum ⇒ Object
- #update_many(points) ⇒ Object
Constructor Details
#initialize(whisper, header) ⇒ Archive
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/whispr/archive.rb', line 20 def initialize(whisper, header) @whisper = whisper @header = header @offset = @header[:offset] @points = @header[:points] @size = @header[:size] @retention = @header[:retention] @spp = @header[:secondsPerPoint] @eoa = @size * @points + @offset end |
Instance Attribute Details
#header ⇒ Hash (readonly)
6 7 8 |
# File 'lib/whispr/archive.rb', line 6 def header @header end |
#offset ⇒ Fixnum (readonly)
8 9 10 |
# File 'lib/whispr/archive.rb', line 8 def offset @offset end |
#points ⇒ Fixnum (readonly)
10 11 12 |
# File 'lib/whispr/archive.rb', line 10 def points @points end |
#retention ⇒ Fixnum (readonly)
14 15 16 |
# File 'lib/whispr/archive.rb', line 14 def retention @retention end |
#size ⇒ Fixnum (readonly)
12 13 14 |
# File 'lib/whispr/archive.rb', line 12 def size @size end |
#spp ⇒ Fixnum (readonly)
16 17 18 |
# File 'lib/whispr/archive.rb', line 16 def spp @spp end |
#whisper ⇒ Whispr (readonly)
18 19 20 |
# File 'lib/whispr/archive.rb', line 18 def whisper @whisper end |
Instance Method Details
#each(&blk) ⇒ Object
Retrieve each point from the archive.
If a block is provided each point is read directly from the whisper file one at a time and yielded. If a block is not provided, all points are read from the file and returned as an enum.
Each point is represented as a three element Array. The first element is the index of the point. The second element is the timestamp of the point and the third element is the value of the point.
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/whispr/archive.rb', line 42 def each(&blk) return slurp.to_enum unless block_given? o_pos = @whisper.fh.pos begin @whisper.fh.pos = @offset points.times {|i| yield(i, *next_point) } ensure @whisper.fh.pos = o_pos end end |
#eoa? ⇒ Boolean
Has the end of the archive been reached?
54 55 56 |
# File 'lib/whispr/archive.rb', line 54 def eoa? @whisper.fh.pos >= @eoa end |
#fetch(fromTime, untilTime) ⇒ Object
Retrieve values for a time period from an archive within a whisper file
The return value will be a two element Array. The first element will be a three element array containing the start time, end time and step. The second element will be a N element array containing each value at each step period.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/whispr/archive.rb', line 94 def fetch(fromTime, untilTime) fromInterval = (fromTime - (fromTime % spp)) + spp untilInterval = (untilTime - (untilTime % spp)) + spp o_pos = @whisper.fh.pos begin @whisper.fh.seek(offset) baseInterval, baseValue = @whisper.fh.read(POINT_SIZE).unpack(POINT_FMT) if baseInterval == 0 step = spp points = (untilInterval - fromInterval) / step timeInfo = [fromInterval, untilInterval, step] return [timeInfo, points.times.map{}] end # Determine fromOffset timeDistance = fromInterval - baseInterval pointDistance = timeDistance / spp byteDistance = pointDistance * POINT_SIZE fromOffset = offset + (byteDistance % size) # Determine untilOffset timeDistance = untilInterval - baseInterval pointDistance = timeDistance / spp byteDistance = pointDistance * POINT_SIZE untilOffset = offset + (byteDistance % size) # Reall all the points in the interval @whisper.fh.seek(fromOffset) if fromOffset < untilOffset # we don't wrap around the archive series = @whisper.fh.read(untilOffset - fromOffset) else # we wrap around the archive, so we need two reads archiveEnd = offset + size series = @whisper.fh.read(archiveEnd - fromOffset) @whisper.fh.seek(offset) series += @whisper.fh.read(untilOffset - offset) end points = series.length / POINT_SIZE series = series.unpack(POINT_FMT * points) currentInterval = fromInterval step = spp valueList = points.times.map{} (0..series.length).step(2) do |i| pointTime = series[i] if pointTime == currentInterval pointValue = series[i+1] valueList[i/2] = pointValue end currentInterval += step end timeInfo = [fromInterval, untilInterval, step] ensure @whisper.fh.pos = o_pos end [timeInfo, valueList] end |
#next_point ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Retrieve the next point from the whisper file.
64 65 66 67 |
# File 'lib/whispr/archive.rb', line 64 def next_point return nil if @whisper.fh.pos >= @eoa || @whisper.fh.pos < @offset @whisper.fh.read(POINT_SIZE).unpack(POINT_FMT) end |
#slurp ⇒ Array
Retrieve all points for this archive from the whisper file.
Each point is represented as a three element Array. The first element is the index of the point. The second element is the timestamp of the point and the third element is the value of the point.
77 78 79 80 81 82 83 84 |
# File 'lib/whispr/archive.rb', line 77 def slurp o_pos = @whisper.fh.pos @whisper.fh.pos = @offset data = @whisper.fh.read(@size).unpack(POINT_FMT * @points) @points.times.map { |i| [i, data.shift, data.shift] } ensure @whisper.fh.pos = o_pos end |
#to_enum ⇒ Object
58 59 60 |
# File 'lib/whispr/archive.rb', line 58 def to_enum slurp.to_enum end |
#update_many(points) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/whispr/archive.rb', line 154 def update_many(points) step = spp alignedPoints = points.map { |ts, v| [(ts - (ts % step)), v] } # Create a packed string for each contiguous sequence of points packedStrings = [] previousInterval = nil currentString = '' alignedPoints.each do |interval, value| next if interval == previousInterval if previousInterval.nil? || (interval == previousInterval + step) currentString += [interval, value].pack(POINT_FMT) else numberOfPoints = currentString.length / POINT_SIZE startInterval = previousInterval - (step * (numberOfPoints - 1)) packedStrings << [startInterval, currentString] currentString = [interval, value].pack(POINT_FMT) end previousInterval = interval end if !currentString.empty? numberOfPoints = currentString.length / POINT_SIZE startInterval = previousInterval - (step * (numberOfPoints - 1)) packedStrings << [startInterval, currentString] end # Read base point and determine where our writes will start @whisper.fh.seek(offset) baseInterval, baseValue = @whisper.fh.read(POINT_SIZE).unpack(POINT_FMT) baseInterval = packedStrings[0][0] if baseInterval == 0 packedStrings.each do |interval, packedString| timeDistance = interval - baseInterval pointDistance = timeDistance / step byteDistance = pointDistance * POINT_SIZE myOffset = offset + (byteDistance % size) @whisper.fh.seek(myOffset) archiveEnd = offset + size bytesBeyond = (myOffset + packedString.length) - archiveEnd if bytesBeyond > 0 @whisper.fh.write(packedString[0..-bytesBeyond]) if(@whisper.fh.pos != archiveEnd) raise ArchiveBoundaryExceeded.new("archiveEnd=#{archiveEnd} pos=#{@whisper.fh.pos} bytesBeyond=#{bytesBeyond} len(packedString)=#{packedString.length}") end @whisper.fh.seek(offset) @whisper.fh.write(packedString[-bytesBeyond..-1]) else @whisper.fh.write(packedString) end end # interval, packedString| # Now we propagate the updates to the lower-precision archives higher = self @whisper.archives.select{|a| a.spp > spp }.each do |lower| lowerIntervals = alignedPoints.map{|p| p[0] - (p[0] % lower.spp) } propagateFurther = false lowerIntervals.uniq.each do |interval| propagateFuther = @whisper.send(:propagate, interval, higher, lower) end break unless propagateFurther higher = lower end end |