Class: PulseMeter::Sensor::Timeline Abstract

Inherits:
Base
  • Object
show all
Includes:
Mixins::Utils, TimelineReduce
Defined in:
lib/pulse_meter/sensor/timeline.rb

Overview

This class is abstract.

Represents timelined sensor: series of values, one value for each consequent time interval.

Constant Summary collapse

MAX_TIMESPAN_POINTS =
1000
DEFAULTS =

Default values for some sensor parameters

{
  :raw_data_ttl => 3600,
  :reduce_delay => 60,
}

Constants included from TimelineReduce

PulseMeter::Sensor::TimelineReduce::MAX_INTERVALS

Constants included from Mixins::Dumper

Mixins::Dumper::DUMP_REDIS_KEY

Instance Attribute Summary collapse

Attributes inherited from Base

#name, #redis

Instance Method Summary collapse

Methods included from TimelineReduce

#collect_ids_to_reduce, included, #reduce, #reduce_all_raw

Methods included from Mixins::Utils

#assert_array!, #assert_positive_integer!, #assert_ranged_float!, #camelize, #camelize_keys, #constantize, #each_subset, #parse_time, #subsets_of, #symbolize_keys, #titleize, #underscore, #uniqid

Methods inherited from Base

#annotate, #annotation, #command_aggregator, #event

Methods included from Mixins::Dumper

included

Constructor Details

#initialize(name, options) ⇒ Timeline

Initializes sensor with given name and parameters

Parameters:

  • name (String)

    sensor name

  • options (Hash)

    a customizable set of options

Options Hash (options):

  • :interval (Fixnum)

    Rotation interval

  • :ttl (Fixnum)

    How long summarized data will be stored before expiration

  • :raw_data_ttl (Fixnum)

    How long unsummarized raw data will be stored before expiration

  • :reduce_delay (Fixnum)

    Delay between end of interval and summarization

  • :timezone (String)

    TimeZone to which sensor data is related



39
40
41
42
43
44
45
46
# File 'lib/pulse_meter/sensor/timeline.rb', line 39

def initialize(name, options)
  @interval = assert_positive_integer!(options, :interval)
  @ttl = assert_positive_integer!(options, :ttl)
  @raw_data_ttl = assert_positive_integer!(options, :raw_data_ttl, DEFAULTS[:raw_data_ttl])
  @reduce_delay = assert_positive_integer!(options, :reduce_delay, DEFAULTS[:reduce_delay])
  @timezone = options[:timezone]
  super
end

Instance Attribute Details

#intervalFixnum (readonly)

Returns Rotation interval.

Returns:

  • (Fixnum)

    Rotation interval



24
25
26
# File 'lib/pulse_meter/sensor/timeline.rb', line 24

def interval
  @interval
end

#raw_data_ttlFixnum (readonly)

Returns How long unsummarized raw data will be stored before expiration.

Returns:

  • (Fixnum)

    How long unsummarized raw data will be stored before expiration



24
# File 'lib/pulse_meter/sensor/timeline.rb', line 24

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay, :timezone

#reduce_delayFixnum (readonly)

Returns Delay between end of interval and summarization.

Returns:

  • (Fixnum)

    Delay between end of interval and summarization



24
# File 'lib/pulse_meter/sensor/timeline.rb', line 24

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay, :timezone

#timezoneObject (readonly)

Returns the value of attribute timezone.



24
# File 'lib/pulse_meter/sensor/timeline.rb', line 24

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay, :timezone

#ttlFixnum (readonly)

Returns How long summarized data will be stored before expiration.

Returns:

  • (Fixnum)

    How long summarized data will be stored before expiration



24
# File 'lib/pulse_meter/sensor/timeline.rb', line 24

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay, :timezone

Instance Method Details

#aggregate_event(key, value) ⇒ Object

This method is abstract.

Registeres event for current interval identified by key

Parameters:

  • key (Fixnum)

    interval id

  • value (Object)

    value to be aggregated



159
160
161
162
# File 'lib/pulse_meter/sensor/timeline.rb', line 159

def aggregate_event(key, value)
  # simple
  redis.set(key, value)
end

#cleanupObject

Clean up all sensor metadata and data



49
50
51
52
53
54
55
# File 'lib/pulse_meter/sensor/timeline.rb', line 49

def cleanup
  keys = redis.keys(raw_data_key('*')) + redis.keys(data_key('*'))
  multi do
    keys.each{|key| redis.del(key)}
  end
  super
end

#current_interval_idFixnum

Returns current interval id

Returns:

  • (Fixnum)


152
153
154
# File 'lib/pulse_meter/sensor/timeline.rb', line 152

def current_interval_id
  get_interval_id(time_to_redis(Time.now))
end

#current_raw_data_keyObject

Returns Redis key by which raw data for current interval is stored



128
129
130
# File 'lib/pulse_meter/sensor/timeline.rb', line 128

def current_raw_data_key
  raw_data_key(current_interval_id)
end

#data_key(id) ⇒ Object

Returns Redis key by which summarized data for given interval is stored

Parameters:

  • id (Fixnum)

    interval id



140
141
142
# File 'lib/pulse_meter/sensor/timeline.rb', line 140

def data_key(id)
  "pulse_meter:data:#{name}:#{id}"
end

#deflate_safe(value) ⇒ Object

This method is abstract.

Deflates data taken from redis as string preserving nil values

Parameters:

  • value (String)

    raw data



173
174
175
176
177
# File 'lib/pulse_meter/sensor/timeline.rb', line 173

def deflate_safe(value)
  value.nil? ? nil : deflate(value)
rescue
  nil
end

#drop_within(from, till) ⇒ Object

Drops sensor data within given time

Parameters:

  • from (Time)

    lower bound

  • till (Time)

    upper bound

Raises:

  • ArgumentError if argumets are not valid time objects



114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/pulse_meter/sensor/timeline.rb', line 114

def drop_within(from, till)
  raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time)
  start_time, end_time = time_to_redis(from.to_i), time_to_redis(till.to_i)
  current_interval_id = get_interval_id(start_time) + interval
  keys = []
  while current_interval_id < end_time
    keys << data_key(current_interval_id)
    keys << raw_data_key(current_interval_id)
    current_interval_id += interval
  end
  keys.empty? ? 0 : redis.del(*keys)
end

#event_at(time, value = nil) ⇒ Object

Processes event from the past

Parameters:

  • time (Time)

    event time

  • value (defaults to: nil)

    event value



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/pulse_meter/sensor/timeline.rb', line 60

def event_at(time, value = nil)
  multi do
    interval_id = get_interval_id(time_to_redis(time))
    key = raw_data_key(interval_id)
    aggregate_event(key, value)
    command_aggregator.expire(key, raw_data_ttl)
  end
  true
rescue StandardError => e
  false
end

#get_interval_id(time) ⇒ Object

Returns interval id where given time is

Parameters:

  • time (Time)


146
147
148
# File 'lib/pulse_meter/sensor/timeline.rb', line 146

def get_interval_id(time)
  (time.to_i / interval) * interval
end

#get_raw_value(interval_id) ⇒ SensorData

Returns sensor data for given interval making in-memory summarization

and returns calculated value

Parameters:

  • interval_id (Fixnum)

Returns:



101
102
103
104
105
106
107
108
# File 'lib/pulse_meter/sensor/timeline.rb', line 101

def get_raw_value(interval_id)
  interval_raw_data_key = raw_data_key(interval_id)
  if redis.exists(interval_raw_data_key)
    sensor_data(interval_id, summarize(interval_raw_data_key))
  else
    sensor_data(interval_id, nil)
  end
end

#raw_data_key(id) ⇒ Object

Returns Redis key by which raw data for given interval is stored

Parameters:

  • id (Fixnum)

    interval id



134
135
136
# File 'lib/pulse_meter/sensor/timeline.rb', line 134

def raw_data_key(id)
  "pulse_meter:raw:#{name}:#{id}"
end

#summarize(key) ⇒ Object

This method is abstract.

Summarizes all event within interval to a single value

Parameters:

  • key (Fixnum)

    interval_id



166
167
168
169
# File 'lib/pulse_meter/sensor/timeline.rb', line 166

def summarize(key)
  # simple
  redis.get(key)
end

#timeline(time_ago) ⇒ Array<SensorData>

Returts sensor data within some last seconds

Parameters:

  • time_ago (Fixnum)

    interval length in seconds

Returns:

Raises:

  • ArgumentError if argumets are not valid time objects



76
77
78
79
80
# File 'lib/pulse_meter/sensor/timeline.rb', line 76

def timeline(time_ago)
  raise ArgumentError unless time_ago.respond_to?(:to_i) && time_ago.to_i > 0
  now = Time.now
  timeline_within(now - time_ago.to_i, now)
end

#timeline_within(from, till, skip_optimization = false) ⇒ Array<SensorData>

Returts sensor data within given time

Parameters:

  • from (Time)

    lower bound

  • till (Time)

    upper bound

  • skip_optimization (Boolean) (defaults to: false)

    must be set to true to skip interval optimization

Returns:

Raises:

  • ArgumentError if argumets are not valid time objects



88
89
90
91
92
93
94
95
# File 'lib/pulse_meter/sensor/timeline.rb', line 88

def timeline_within(from, till, skip_optimization = false)
  raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time)
  start_time, end_time = time_to_redis(from.to_i), time_to_redis(till.to_i)
  actual_interval = optimized_interval(start_time, end_time, skip_optimization)
  start_interval_id = get_interval_id(start_time) + actual_interval
  ids, values = fetch_reduced_interval_data(start_interval_id, actual_interval, end_time)
  zip_with_raw_data(ids, values)
end