Class: PulseMeter::Sensor::Timeline Abstract
- Includes:
- Mixins::Utils
- Defined in:
- lib/pulse-meter/sensor/timeline.rb
Overview
Represents timelined sensor: series of values, one value for each consequent time interval.
Direct Known Subclasses
PulseMeter::Sensor::Timelined::Average, PulseMeter::Sensor::Timelined::Counter, PulseMeter::Sensor::Timelined::HashedCounter, PulseMeter::Sensor::Timelined::Max, PulseMeter::Sensor::Timelined::Min, PulseMeter::Sensor::Timelined::Percentile, PulseMeter::Sensor::Timelined::UniqCounter
Constant Summary collapse
- MAX_TIMESPAN_POINTS =
1000
- DEFAULTS =
Default values for some sensor parameters
{ :raw_data_ttl => 3600, :reduce_delay => 60, }
Constants included from Mixins::Dumper
Mixins::Dumper::DUMP_REDIS_KEY
Instance Attribute Summary collapse
-
#interval ⇒ Fixnum
readonly
Rotation interval.
-
#raw_data_ttl ⇒ Fixnum
readonly
How long unsummarized raw data will be stored before expiration.
-
#reduce_delay ⇒ Object
readonly
Returns the value of attribute reduce_delay.
-
#ttl ⇒ Fixnum
readonly
How long summarized data will be stored before expiration.
Attributes inherited from Base
Class Method Summary collapse
Instance Method Summary collapse
- #aggregate_event(key, value) ⇒ Object abstract
-
#cleanup ⇒ Object
Clean up all sensor metadata and data.
-
#current_interval_id ⇒ Fixnum
Returns current interval id.
-
#current_raw_data_key ⇒ Object
Returns Redis key by which raw data for current interval is stored.
-
#data_key(id) ⇒ Object
Returns Redis key by which summarized data for given interval is stored.
-
#drop_within(from, till) ⇒ Object
Drops sensor data within given time.
-
#event(value = nil) ⇒ Object
Processes event.
-
#event_at(time, value = nil) ⇒ Object
Processes event from the past.
-
#get_interval_id(time) ⇒ Object
Returns interval id where given time is.
-
#get_raw_value(interval_id) ⇒ SensorData
Returns sensor data for given interval making in-memory summarization and returns calculated value.
-
#initialize(name, options) ⇒ Timeline
constructor
Initializes sensor with given name and parameters.
-
#optimized_inteval(start_time, end_time) ⇒ Fixnum
Makes interval optimization so that the requested timespan contains less than MAX_TIMESPAN_POINTS values.
-
#raw_data_key(id) ⇒ Object
Returns Redis key by which raw data for given interval is stored.
-
#reduce(interval_id) ⇒ Object
Reduces data in given interval.
-
#reduce_all_raw ⇒ Object
Reduces data in all raw interval.
- #summarize(key) ⇒ Object abstract
-
#timeline(time_ago) ⇒ Array<SensorData>
Returts sensor data within some last seconds.
-
#timeline_within(from, till) ⇒ Array<SensorData>
Returts sensor data within given time.
Methods included from Mixins::Utils
#assert_positive_integer!, #assert_ranged_float!, #camelize, #camelize_keys, #constantize, #symbolize_keys, #titleize, #uniqid
Methods inherited from Base
Methods included from Mixins::Dumper
Constructor Details
#initialize(name, options) ⇒ Timeline
Initializes sensor with given name and parameters
34 35 36 37 38 39 40 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 34 def initialize(name, ) @interval = assert_positive_integer!(, :interval) @ttl = assert_positive_integer!(, :ttl) @raw_data_ttl = assert_positive_integer!(, :raw_data_ttl, DEFAULTS[:raw_data_ttl]) @reduce_delay = assert_positive_integer!(, :reduce_delay, DEFAULTS[:reduce_delay]) super end |
Instance Attribute Details
#interval ⇒ Fixnum (readonly)
Returns Rotation interval.
20 21 22 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 20 def interval @interval end |
#raw_data_ttl ⇒ Fixnum (readonly)
Returns How long unsummarized raw data will be stored before expiration.
20 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 20 attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay |
#reduce_delay ⇒ Object (readonly)
Returns the value of attribute reduce_delay.
20 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 20 attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay |
#ttl ⇒ Fixnum (readonly)
Returns How long summarized data will be stored before expiration.
20 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 20 attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay |
Class Method Details
.reduce_all_raw ⇒ Object
102 103 104 105 106 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 102 def self.reduce_all_raw list_objects.each do |sensor| sensor.reduce_all_raw if sensor.respond_to? :reduce_all_raw end end |
Instance Method Details
#aggregate_event(key, value) ⇒ Object
Registeres event for current interval identified by key
227 228 229 230 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 227 def aggregate_event(key, value) # simple redis.set(key, value) end |
#cleanup ⇒ Object
Clean up all sensor metadata and data
43 44 45 46 47 48 49 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 43 def cleanup keys = redis.keys(raw_data_key('*')) + redis.keys(data_key('*')) multi do keys.each{|key| redis.del(key)} end super end |
#current_interval_id ⇒ Fixnum
Returns current interval id
220 221 222 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 220 def current_interval_id get_interval_id(Time.now) end |
#current_raw_data_key ⇒ Object
Returns Redis key by which raw data for current interval is stored
196 197 198 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 196 def current_raw_data_key raw_data_key(current_interval_id) end |
#data_key(id) ⇒ Object
Returns Redis key by which summarized data for given interval is stored
208 209 210 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 208 def data_key(id) "pulse_meter:data:#{name}:#{id}" end |
#drop_within(from, till) ⇒ Object
Drops sensor data within given time
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 178 def drop_within(from, till) raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time) start_time, end_time = from.to_i, till.to_i current_interval_id = get_interval_id(start_time) + interval keys = [] while current_interval_id < end_time keys << data_key(current_interval_id) keys << raw_data_key(current_interval_id) current_interval_id += interval end if keys.empty? 0 else redis.del(*keys) end end |
#event(value = nil) ⇒ Object
Processes event
53 54 55 56 57 58 59 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 53 def event(value = nil) multi do current_key = current_raw_data_key aggregate_event(current_key, value) redis.expire(current_key, raw_data_ttl) end end |
#event_at(time, value = nil) ⇒ Object
Processes event from the past
64 65 66 67 68 69 70 71 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 64 def event_at(time, value = nil) multi do interval_id = get_interval_id(time) key = raw_data_key(interval_id) aggregate_event(key, value) redis.expire(key, raw_data_ttl) end end |
#get_interval_id(time) ⇒ Object
Returns interval id where given time is
214 215 216 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 214 def get_interval_id(time) (time.to_i / interval) * interval end |
#get_raw_value(interval_id) ⇒ SensorData
Returns sensor data for given interval making in-memory summarization
and returns calculated value
168 169 170 171 172 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 168 def get_raw_value(interval_id) interval_raw_data_key = raw_data_key(interval_id) return SensorData.new(Time.at(interval_id), summarize(interval_raw_data_key)) if redis.exists(interval_raw_data_key) SensorData.new(Time.at(interval_id), nil) end |
#optimized_inteval(start_time, end_time) ⇒ Fixnum
Makes interval optimization so that the requested timespan contains less than MAX_TIMESPAN_POINTS values
155 156 157 158 159 160 161 162 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 155 def optimized_inteval(start_time, end_time) res_interval = interval timespan = end_time - start_time while timespan / res_interval > MAX_TIMESPAN_POINTS - 1 res_interval *= 2 end res_interval end |
#raw_data_key(id) ⇒ Object
Returns Redis key by which raw data for given interval is stored
202 203 204 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 202 def raw_data_key(id) "pulse_meter:raw:#{name}:#{id}" end |
#reduce(interval_id) ⇒ Object
Interval id is just unixtime of its lower bound. Ruduction is a process of ‘compressing’ all interval’s raw data to a single value. When reduction is done summarized data is saved to Redis separately with expiration time taken from sensor configuration.
Reduces data in given interval.
80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 80 def reduce(interval_id) interval_raw_data_key = raw_data_key(interval_id) return unless redis.exists(interval_raw_data_key) value = summarize(interval_raw_data_key) interval_data_key = data_key(interval_id) multi do redis.del(interval_raw_data_key) redis.set(interval_data_key, value) redis.expire(interval_data_key, ttl) end end |
#reduce_all_raw ⇒ Object
Reduces data in all raw interval
93 94 95 96 97 98 99 100 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 93 def reduce_all_raw min_time = Time.now - reduce_delay - interval redis.keys(raw_data_key('*')).each do |key| interval_id = key.split(':').last next if Time.at(interval_id.to_i) > min_time reduce(interval_id) end end |
#summarize(key) ⇒ Object
Summarizes all event within interval to a single value
234 235 236 237 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 234 def summarize(key) # simple redis.get(key) end |
#timeline(time_ago) ⇒ Array<SensorData>
Returts sensor data within some last seconds
112 113 114 115 116 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 112 def timeline(time_ago) raise ArgumentError unless time_ago.respond_to?(:to_i) && time_ago.to_i > 0 now = Time.now timeline_within(now - time_ago.to_i, now) end |
#timeline_within(from, till) ⇒ Array<SensorData>
Returts sensor data within given time
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 123 def timeline_within(from, till) raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time) start_time, end_time = from.to_i, till.to_i actual_interval = optimized_inteval(start_time, end_time) current_interval_id = get_interval_id(start_time) + actual_interval keys = [] ids = [] while current_interval_id < end_time ids << current_interval_id keys << data_key(current_interval_id) current_interval_id += actual_interval end values = if keys.empty? [] else redis.mget(*keys) end res = [] ids.zip(values) do |(id, val)| res << if val.nil? get_raw_value(id) else SensorData.new(Time.at(id), val) end end res end |