Class: PulseMeter::Sensor::Timeline Abstract

Inherits:
Base
  • Object
show all
Includes:
Mixins::Utils
Defined in:
lib/pulse-meter/sensor/timeline.rb

Overview

This class is abstract.

Represents timelined sensor: series of values, one value for each consequent time interval.

Constant Summary collapse

MAX_TIMESPAN_POINTS =
1000
DEFAULTS =

Default values for some sensor parameters

{
  :raw_data_ttl => 3600,
  :reduce_delay => 60,
}

Constants included from Mixins::Dumper

Mixins::Dumper::DUMP_REDIS_KEY

Instance Attribute Summary collapse

Attributes inherited from Base

#name, #redis

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Mixins::Utils

#assert_positive_integer!, #assert_ranged_float!, #camelize, #camelize_keys, #constantize, #symbolize_keys, #titleize, #uniqid

Methods inherited from Base

#annotate, #annotation

Methods included from Mixins::Dumper

included

Constructor Details

#initialize(name, options) ⇒ Timeline

Initializes sensor with given name and parameters

Parameters:

  • name (String)

    sensor name

  • options (Hash)

    a customizable set of options

Options Hash (options):

  • :interval (Fixnum)

    Rotation interval

  • :ttl (Fixnum)

    How long summarized data will be stored before expiration

  • :raw_data_ttl (Fixnum)

    How long unsummarized raw data will be stored before expiration

  • :reduce_delay (Fixnum)

    Delay between end of interval and summarization



34
35
36
37
38
39
40
# File 'lib/pulse-meter/sensor/timeline.rb', line 34

def initialize(name, options)
  @interval = assert_positive_integer!(options, :interval)
  @ttl = assert_positive_integer!(options, :ttl)
  @raw_data_ttl = assert_positive_integer!(options, :raw_data_ttl, DEFAULTS[:raw_data_ttl])
  @reduce_delay = assert_positive_integer!(options, :reduce_delay, DEFAULTS[:reduce_delay])
  super
end

Instance Attribute Details

#intervalFixnum (readonly)

Returns Rotation interval.

Returns:

  • (Fixnum)

    Rotation interval



20
21
22
# File 'lib/pulse-meter/sensor/timeline.rb', line 20

def interval
  @interval
end

#raw_data_ttlFixnum (readonly)

Returns How long unsummarized raw data will be stored before expiration.

Returns:

  • (Fixnum)

    How long unsummarized raw data will be stored before expiration



20
# File 'lib/pulse-meter/sensor/timeline.rb', line 20

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay

#reduce_delayObject (readonly)

Returns the value of attribute reduce_delay.



20
# File 'lib/pulse-meter/sensor/timeline.rb', line 20

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay

#ttlFixnum (readonly)

Returns How long summarized data will be stored before expiration.

Returns:

  • (Fixnum)

    How long summarized data will be stored before expiration



20
# File 'lib/pulse-meter/sensor/timeline.rb', line 20

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay

Class Method Details

.reduce_all_rawObject



102
103
104
105
106
# File 'lib/pulse-meter/sensor/timeline.rb', line 102

def self.reduce_all_raw
  list_objects.each do |sensor|
    sensor.reduce_all_raw if sensor.respond_to? :reduce_all_raw
  end
end

Instance Method Details

#aggregate_event(key, value) ⇒ Object

This method is abstract.

Registeres event for current interval identified by key

Parameters:

  • key (Fixnum)

    interval id

  • value (Object)

    value to be aggregated



227
228
229
230
# File 'lib/pulse-meter/sensor/timeline.rb', line 227

def aggregate_event(key, value)
  # simple
  redis.set(key, value)
end

#cleanupObject

Clean up all sensor metadata and data



43
44
45
46
47
48
49
# File 'lib/pulse-meter/sensor/timeline.rb', line 43

def cleanup
  keys = redis.keys(raw_data_key('*')) + redis.keys(data_key('*'))
  multi do
    keys.each{|key| redis.del(key)}
  end
  super
end

#current_interval_idFixnum

Returns current interval id

Returns:

  • (Fixnum)


220
221
222
# File 'lib/pulse-meter/sensor/timeline.rb', line 220

def current_interval_id
  get_interval_id(Time.now)
end

#current_raw_data_keyObject

Returns Redis key by which raw data for current interval is stored



196
197
198
# File 'lib/pulse-meter/sensor/timeline.rb', line 196

def current_raw_data_key
  raw_data_key(current_interval_id)
end

#data_key(id) ⇒ Object

Returns Redis key by which summarized data for given interval is stored

Parameters:

  • id (Fixnum)

    interval id



208
209
210
# File 'lib/pulse-meter/sensor/timeline.rb', line 208

def data_key(id)
  "pulse_meter:data:#{name}:#{id}"
end

#drop_within(from, till) ⇒ Object

Drops sensor data within given time

Parameters:

  • from (Time)

    lower bound

  • till (Time)

    upper bound

Raises:

  • ArgumentError if argumets are not valid time objects



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/pulse-meter/sensor/timeline.rb', line 178

def drop_within(from, till)
  raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time)
  start_time, end_time = from.to_i, till.to_i
  current_interval_id = get_interval_id(start_time) + interval
  keys = []
  while current_interval_id < end_time
    keys << data_key(current_interval_id)
    keys << raw_data_key(current_interval_id)
    current_interval_id += interval
  end
  if keys.empty?
    0
  else
    redis.del(*keys)
  end
end

#event(value = nil) ⇒ Object

Processes event

Parameters:

  • value (defaults to: nil)

    event value



53
54
55
56
57
58
59
# File 'lib/pulse-meter/sensor/timeline.rb', line 53

def event(value = nil)
  multi do
    current_key = current_raw_data_key
    aggregate_event(current_key, value)
    redis.expire(current_key, raw_data_ttl)
  end
end

#event_at(time, value = nil) ⇒ Object

Processes event from the past

Parameters:

  • time (Time)

    event time

  • value (defaults to: nil)

    event value



64
65
66
67
68
69
70
71
# File 'lib/pulse-meter/sensor/timeline.rb', line 64

def event_at(time, value = nil)
  multi do
    interval_id = get_interval_id(time)
    key = raw_data_key(interval_id)
    aggregate_event(key, value)
    redis.expire(key, raw_data_ttl)
  end
end

#get_interval_id(time) ⇒ Object

Returns interval id where given time is

Parameters:

  • time (Time)


214
215
216
# File 'lib/pulse-meter/sensor/timeline.rb', line 214

def get_interval_id(time)
  (time.to_i / interval) * interval
end

#get_raw_value(interval_id) ⇒ SensorData

Returns sensor data for given interval making in-memory summarization

and returns calculated value

Parameters:

  • interval_id (Fixnum)

Returns:



168
169
170
171
172
# File 'lib/pulse-meter/sensor/timeline.rb', line 168

def get_raw_value(interval_id)
  interval_raw_data_key = raw_data_key(interval_id)
  return SensorData.new(Time.at(interval_id), summarize(interval_raw_data_key)) if redis.exists(interval_raw_data_key)
  SensorData.new(Time.at(interval_id), nil)
end

#optimized_inteval(start_time, end_time) ⇒ Fixnum

Makes interval optimization so that the requested timespan contains less than MAX_TIMESPAN_POINTS values

Parameters:

  • start_time (Fixnum)

    unix timestamp of timespan start

  • end_time (Fixnum)

    unix timestamp of timespan start

Returns:

  • (Fixnum)

    optimized interval in seconds.



155
156
157
158
159
160
161
162
# File 'lib/pulse-meter/sensor/timeline.rb', line 155

def optimized_inteval(start_time, end_time)
  res_interval = interval
  timespan = end_time - start_time
  while timespan / res_interval > MAX_TIMESPAN_POINTS - 1
    res_interval *= 2
  end
  res_interval
end

#raw_data_key(id) ⇒ Object

Returns Redis key by which raw data for given interval is stored

Parameters:

  • id (Fixnum)

    interval id



202
203
204
# File 'lib/pulse-meter/sensor/timeline.rb', line 202

def raw_data_key(id)
  "pulse_meter:raw:#{name}:#{id}"
end

#reduce(interval_id) ⇒ Object

Note:

Interval id is just unixtime of its lower bound. Ruduction is a process of ‘compressing’ all interval’s raw data to a single value. When reduction is done summarized data is saved to Redis separately with expiration time taken from sensor configuration.

Reduces data in given interval.

Parameters:

  • interval_id (Fixnum)


80
81
82
83
84
85
86
87
88
89
90
# File 'lib/pulse-meter/sensor/timeline.rb', line 80

def reduce(interval_id)
  interval_raw_data_key = raw_data_key(interval_id)
  return unless redis.exists(interval_raw_data_key)
  value = summarize(interval_raw_data_key)
  interval_data_key = data_key(interval_id)
  multi do
    redis.del(interval_raw_data_key)
    redis.set(interval_data_key, value)
    redis.expire(interval_data_key, ttl)
  end
end

#reduce_all_rawObject

Reduces data in all raw interval



93
94
95
96
97
98
99
100
# File 'lib/pulse-meter/sensor/timeline.rb', line 93

def reduce_all_raw
  min_time = Time.now - reduce_delay - interval
  redis.keys(raw_data_key('*')).each do |key|
    interval_id = key.split(':').last
    next if Time.at(interval_id.to_i) > min_time
    reduce(interval_id)
  end
end

#summarize(key) ⇒ Object

This method is abstract.

Summarizes all event within interval to a single value

Parameters:

  • key (Fixnum)

    interval_id



234
235
236
237
# File 'lib/pulse-meter/sensor/timeline.rb', line 234

def summarize(key)
  # simple
  redis.get(key)
end

#timeline(time_ago) ⇒ Array<SensorData>

Returts sensor data within some last seconds

Parameters:

  • time_ago (Fixnum)

    interval length in seconds

Returns:

Raises:

  • ArgumentError if argumets are not valid time objects



112
113
114
115
116
# File 'lib/pulse-meter/sensor/timeline.rb', line 112

def timeline(time_ago)
  raise ArgumentError unless time_ago.respond_to?(:to_i) && time_ago.to_i > 0
  now = Time.now
  timeline_within(now - time_ago.to_i, now)
end

#timeline_within(from, till) ⇒ Array<SensorData>

Returts sensor data within given time

Parameters:

  • from (Time)

    lower bound

  • till (Time)

    upper bound

Returns:

Raises:

  • ArgumentError if argumets are not valid time objects



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/pulse-meter/sensor/timeline.rb', line 123

def timeline_within(from, till)
  raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time)
  start_time, end_time = from.to_i, till.to_i
  actual_interval = optimized_inteval(start_time, end_time)
  current_interval_id = get_interval_id(start_time) + actual_interval
  keys = []
  ids = []
  while current_interval_id < end_time
    ids << current_interval_id
    keys << data_key(current_interval_id)
    current_interval_id += actual_interval
  end
  values = if keys.empty?
    []
  else
    redis.mget(*keys)
  end
  res = []
  ids.zip(values) do |(id, val)|
    res << if val.nil?
      get_raw_value(id)
    else
      SensorData.new(Time.at(id), val)
    end
  end
  res
end