Class: Whispr

Inherits:
Object
  • Object
show all
Defined in:
lib/whispr.rb,
lib/whispr/archive.rb,
lib/whispr/version.rb

Defined Under Namespace

Modules: Error Classes: Archive, ArchiveBoundaryExceeded, CorruptWhisprFile, InvalidAggregationMethod, InvalidConfiguration, InvalidTimeInterval, TimestampNotCovered, ValueError, WhisprError

Constant Summary collapse

LONG_FMT =
"N"
METADATA_FMT =
"#{LONG_FMT*2}g#{LONG_FMT}"
METADATA_SIZE =
16
ARCHIVE_INFO_FMT =
LONG_FMT * 3
ARCHIVE_INFO_SIZE =
12
POINT_FMT =
"#{LONG_FMT}G"
POINT_SIZE =
12
CHUNK_SIZE =
16384
AGGR_TYPES =
[
  :_,
  :average,
  :sum,
  :last,
  :max,
  :min
].freeze
VERSION =
'0.2.0'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(file, auto_flush = true) ⇒ Whispr

Returns a new instance of Whispr.



223
224
225
226
227
# File 'lib/whispr.rb', line 223

def initialize(file, auto_flush = true)
  @fh = file.is_a?(File) || file.is_a?(StringIO) ? file : File.open(file, 'r+')
  @fh.binmode
  @auto_flush = auto_flush
end

Instance Attribute Details

#auto_flushObject Also known as: auto_flush?

Returns the value of attribute auto_flush.



220
221
222
# File 'lib/whispr.rb', line 220

def auto_flush
  @auto_flush
end

#fhFile, StringIO (readonly)

Returns file handle of the whisper file.

Returns:

  • (File, StringIO)

    file handle of the whisper file



218
219
220
# File 'lib/whispr.rb', line 218

def fh
  @fh
end

Class Method Details

.create(path, archiveList, opts = {}) ⇒ Object

Create whipser file on the file system and prepopulate it.

Parameters:

  • path (String)
  • archiveList (Array)

    each archive is an array with two elements: [secondsPerPoint,numberOfPoints]

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :xff (Float) — default: 0.5

    the fraction of data points in a propagation interval that must have known values for a propagation to occur

  • :aggregationMethod (Symbol) — default: average

    the function to use when propogating data; must be one of AGGR_TYPES

  • :overwrite (Boolean) — default: false
  • :sparse (Boolean) — default: false

Raises:

  • (InvalidConfiguration)

    if the archiveList is inavlid, or if ‘path’ exists and :overwrite is not true

See Also:

  • Whsipr.validateArchiveList


88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/whispr.rb', line 88

def create(path, archiveList, opts = {})
  validate_opts(opts)
  validateArchiveList!(archiveList)
  raise InvalidConfiguration.new("File #{path} already exists!") if File.exists?(path) && !opts[:overwrite]

  # if file exists it will be truncated
  File.open(path, "wb")  do |fh|
    fh.flock(File::LOCK_EX)
    prepopulate(fh, archiveList, opts)
  end

  new(path)
end

.parse_retention_def(rdef) ⇒ Object

Raises:

  • (ArgumentError)


55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/whispr.rb', line 55

def parse_retention_def(rdef)
  raise ArgumentError.new("precision and points must be separated by a ':'") unless rdef && rdef.include?(":")
  (precision, points) = rdef.strip.split(':')
  if precision.to_i.to_s == precision
    precision = precision.to_i * unitMultipliers['s']
  else
    _, precision, unit = precision.split(/([\d]+)/)
    unit = 's' unless unit
    raise ValueError.new("Invalid precision specification unit #{unit}") unless unitMultipliers[unit[0]]
    precision = precision.to_i * unitMultipliers[unit[0]]
  end

  if points.to_i.to_s == points
    points = points.to_i
  else
    _, points, unit = points.split(/([\d]+)/)
    raise ValueError.new("Invalid retention specification unit #{unit}") unless unitMultipliers[unit[0]]
    points = points.to_i * unitMultipliers[unit[0]] / precision
  end

  [precision, points]
end

.prepopulate(fh, archiveList, opts = {}) ⇒ Object

Build the header and reserve space for the archives in the Whispr file.

You probably don’t want to use this method, you probably want to use #create instead. Calls to prepopulate MUST be preceeded by a call to validateArchiveList! with the archiveList argument.

Parameters:

  • the (File)

    filehandle that will hold the archive

  • archiveList (Array)

    each archive is an array with two elements: [secondsPerPoint,numberOfPoints]

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :xff (Float)

    the fraction of data points in a propagation interval that must have known values for a propagation to occur

  • :aggregationMethod (Symbol)

    the function to use when propogating data; must be one of AGGR_TYPES

  • :overwrite (Boolean) — default: false

Raises:

  • (InvalidConfiguration)

    if the archiveList is inavlid, or if ‘path’ exists and :overwrite is not true

See Also:

  • Whsipr.validateArchiveList
  • Whsipr.create


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/whispr.rb', line 128

def prepopulate(fh, archiveList, opts = {})
  opts            = validate_opts(opts)
  aggregationType = AGGR_TYPES.index(opts[:aggregationMethod])
  oldest         = archiveList.map{|spp, points| spp * points }.sort.last
   = [aggregationType, oldest, opts[:xff], archiveList.length].pack()
  fh.write()
  headerSize            =  + (ARCHIVE_INFO_SIZE * archiveList.length)
  archiveOffsetPointer = headerSize
  archiveList.each do |spp, points|
    archiveInfo = [archiveOffsetPointer, spp, points].pack(ARCHIVE_INFO_FMT)
    fh.write(archiveInfo)
    archiveOffsetPointer += (points * POINT_SIZE)
  end

  if opts[:sparse]
    fh.seek(archiveOffsetPointer - headerSize - 1)
    fh.write("\0")
  else
    remaining = archiveOffsetPointer - headerSize
    zeroes = "\x00" * CHUNK_SIZE
    while remaining > CHUNK_SIZE
      fh.write(zeroes)
      remaining -= CHUNK_SIZE
    end
    fh.write(zeroes[0..remaining])
  end

  fh.flush
  fh.fsync rescue nil
  fh
end

.unitMultipliersObject



44
45
46
47
48
49
50
51
52
53
# File 'lib/whispr.rb', line 44

def unitMultipliers
  @unitMultipliers ||= {
    's' => 1,
    'm' => 60,
    'h' => 3600,
    'd' => 86400,
    'w' => 86400 * 7,
    'y' => 86400 * 365
  }
end

.validArchiveList?(archiveList) ⇒ Boolean

Is the provided archive list valid?

Returns:

  • (Boolean)

    true, false



162
163
164
# File 'lib/whispr.rb', line 162

def validArchiveList?(archiveList)
  !(!!(validateArchiveList!(archiveList) rescue true))
end

.validate_opts(opts = {}) ⇒ Hash

Set defaults for the options to #create and #prepopulate as well as validate the supplied options.

Parameters:

  • opts (Hash) (defaults to: {})

Returns:

  • (Hash)

    updated options



105
106
107
108
109
110
111
# File 'lib/whispr.rb', line 105

def validate_opts(opts = {})
  opts = {:xff => 0.5, :aggregationMethod => :average, :sparse => false, :overwrite => false}.merge(opts)
  unless AGGR_TYPES[1..-1].include?(opts[:aggregationMethod])
    raise InvalidConfiguration.new("aggregationMethod must be one of #{AGGR_TYPES[1..-1]}")
  end
  opts
end

.validateArchiveList(archiveList) ⇒ NilClass, InvalidConfiguration

Validate an archive list without raising an exception

Returns:



168
169
170
# File 'lib/whispr.rb', line 168

def validateArchiveList(archiveList)
  validateArchiveList!(archiveList) rescue $!
end

.validateArchiveList!(archiveList) ⇒ nil

Validate an archive list An ArchiveList must:

  1. Have at least one archive config. Example: [60, 86400]

  2. No archive may be a duplicate of another.

  3. Higher precision archives’ precision must evenly divide all lower precision archives’ precision.

  4. Lower precision archives must cover larger time intervals than higher precision archives.

  5. Each archive must have at least enough points to consolidate to the next archive

Returns:

  • (nil)

Raises:



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/whispr.rb', line 181

def validateArchiveList!(archiveList)
  raise InvalidConfiguration.new("you must specify at least on archive configuration") if Array(archiveList).empty?
  archiveList = archiveList.sort{|a,b| a[0] <=> b[0] }
  archiveList[0..-2].each_with_index do |archive, i|
    nextArchive = archiveList[i+1]
    unless archive[0] < nextArchive[0]
      raise InvalidConfiguration.new("A Whipser database may not be configured " +
        "having two archives with the same precision " +
        "(archive#{i}: #{archive}, archive#{i+1}: #{nextArchive})")
    end
    unless nextArchive[0] % archive[0] == 0
      raise InvalidConfiguration.new("Higher precision archives' precision must " +
        "evenly divide all lower precision archives' precision " +
        "(archive#{i}: #{archive}, archive#{i+1}: #{nextArchive})")
    end

    retention = archive[0] * archive[1]
    nextRetention = nextArchive[0] * nextArchive[1]
    unless nextRetention > retention
      raise InvalidConfiguration.new("Lower precision archives must cover larger " +
        "time intervals than higher precision archives " +
        "(archive#{i}: #{archive[1]}, archive#{i + 1}:, #{nextArchive[1]})")
    end

    archivePoints = archive[1]
    pointsPerConsolidation = nextArchive[0] / archive[0]
    unless archivePoints >= pointsPerConsolidation
      raise InvalidConfiguration.new("Each archive must have at least enough points " +
        "to consolidate to the next archive (archive#{i+1} consolidates #{pointsPerConsolidation} of " +
        "archive#{i}'s points but it has only #{archivePoints} total points)")
    end
  end
  nil
end

Instance Method Details

#archivesArray

Returns Archives.

Returns:

  • (Array)

    Archives

See Also:



237
238
239
# File 'lib/whispr.rb', line 237

def archives
  @archives ||= info[:archives].map { |a| Archive.new(self, a) }
end

#closeObject



289
290
291
# File 'lib/whispr.rb', line 289

def close
  @fh.close
end

#closed?Boolean

Returns:

  • (Boolean)


285
286
287
# File 'lib/whispr.rb', line 285

def closed?
  @fh.closed?
end

#fetch(fromTime, untilTime = Time.new) ⇒ Object

Retrieve values from a whisper file within the given time window.

The most appropriate archive within the whisper file will be chosen. The return value will be a two element Array. The first element will be a three element array containing the start time, end time and step. The second element will be a N element array containing each value at each step period.



251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/whispr.rb', line 251

def fetch(fromTime, untilTime = Time.new)
  fromTime  = fromTime.to_i
  untilTime = untilTime.to_i
  now       = Time.now.to_i
  oldest    = now - header[:maxRetention]
  fromTime  = oldest if fromTime < oldest
  raise InvalidTimeInterval.new("Invalid time interval (untilTime=#{untilTime}, fromTime=#{fromTime})") unless fromTime < untilTime
  untilTime = now if untilTime > now || untilTime < fromTime

  diff    = now - fromTime
  archive = archives.find{|a| a.retention >= diff }
  return archive.fetch(fromTime, untilTime)
end

#headerHash Also known as: info

Returns:

  • (Hash)


230
231
232
# File 'lib/whispr.rb', line 230

def header
  @header ||= read_header
end

#update(*points) ⇒ Object

Update one or many points Each element of the points list should be a two dimensional Array where the first element is a timestamp and the second element is a value.



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/whispr.rb', line 268

def update(*points)
  if points[0].is_a?(Array)
    # Cover the least exhaustive, and most likely, nested array check first
    points = points.length == 1 ? points[0] : points.flatten
  elsif points.any? { |p| p.is_a?(Array) }
    points = points.flatten
  end
  return if points.empty? || points.length % 2 != 0

  # TODO lock the file
  if points.length == 2
    update_one(points[1], points[0])
  else
    update_many(points)
  end
end