Class: DataOperations::Aggregate

Inherits:
Object
  • Object
show all
Defined in:
lib/dataoperations-aggregate.rb

Constant Summary collapse

DEFAULT_TIME_FORMAT =
'%Y-%m-%dT%H:%M:%S.%L%:z'.freeze
DEFAULT_TIME_FIELD =
'timestamp'.freeze
DEFAULT_OUTPUT_TIME_FORMAT =
'%Y-%m-%dT%H:%M:%S.%L%z'.freeze
DEFAULT_INTERVALS =
[10].freeze
DEFAULT_FLUSH_INTERVAL =
5
DEFAULT_PROCESSING_MODE =
:batch
DEFAULT_TIME_STARTED_MODE =
:first_message
DEFAULT_FIELD_NO_DATA_VALUE =
'no_data'.freeze
DEFAULT_AGGREGATIONS =
%w[sum min max mean median variance standard_deviation].freeze
VALID_AGGREGATIONS =
%w[sum min max mean median variance standard_deviation].freeze
DEFAULT_HASH_TIME_FORMAT =
'%Y-%m-%dT%H'.freeze
DEFAULT_INERVAL_SECONDS =
3600

Instance Method Summary collapse

Constructor Details

#initialize(aggregator: {}, time_format: DEFAULT_TIME_FORMAT, time_field: DEFAULT_TIME_FIELD, output_time_format: DEFAULT_OUTPUT_TIME_FORMAT, intervals: DEFAULT_INTERVALS, flush_interval: DEFAULT_FLUSH_INTERVAL, keep_interval: DEFAULT_KEEP_INTERVAL, field_no_data_value: DEFAULT_FIELD_NO_DATA_VALUE, processing_mode: DEFAULT_PROCESSING_MODE, time_started_mode: DEFAULT_TIME_STARTED_MODE, aggregator_name: nil, log: Logger.new(STDOUT), aggregation_names:, group_field_names:, aggregate_field_names:) ⇒ Aggregate

Returns a new instance of Aggregate.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/dataoperations-aggregate.rb', line 19

def initialize(aggregator: {},
               time_format: DEFAULT_TIME_FORMAT,
               time_field: DEFAULT_TIME_FIELD,
               output_time_format: DEFAULT_OUTPUT_TIME_FORMAT,
               intervals: DEFAULT_INTERVALS,
               flush_interval: DEFAULT_FLUSH_INTERVAL,
               keep_interval: DEFAULT_KEEP_INTERVAL,
               field_no_data_value: DEFAULT_FIELD_NO_DATA_VALUE,
               processing_mode: DEFAULT_PROCESSING_MODE,
               time_started_mode: DEFAULT_TIME_STARTED_MODE,
               aggregator_name: nil,
               log: Logger.new(STDOUT),
               aggregation_names:,
               group_field_names:,
               aggregate_field_names:
            )
  @aggregator = aggregator
  @time_format = time_format
  @time_field = time_field
  @output_time_format = output_time_format
  @intervals = intervals.uniq.sort!
  @flush_interval = flush_interval
  @keep_interval = keep_interval
  @field_no_data_value = field_no_data_value
  @processing_mode = processing_mode
  @time_started_mode = time_started_mode
  @aggregator_name = aggregator_name


  if aggregation_names.nil? || !aggregation_names.is_a?(Array)
    raise 'Configuration error, aggregation_names must be specified and Array'
  end
  if group_field_names.nil? || !aggregation_names.is_a?(Array)
    raise 'Configuration error, group_field_names must be specified and Array'
  end
  if aggregate_field_names.nil? || !aggregation_names.is_a?(Array)
    raise 'Configuration error, aggregate_field_names must be specified and Array'
  end

  @log = log

  @hash_time_format = DEFAULT_HASH_TIME_FORMAT
  @interval_seconds = DEFAULT_INERVAL_SECONDS

  @aggregation_names = aggregation_names
  @group_field_names = group_field_names
  @aggregate_field_names = aggregate_field_names

  @aggregation_names.each do |operation|
    unless VALID_AGGREGATIONS.include?(operation)
      raise 'aggregations must set any combination of sum,min,max,mean,median,variance,standard_deviation'
    end
  end
  @intervals.each do |interval|
    unless (interval % @intervals[0]).zero?
      raise "interval: #{interval} must be multiple of first interval: #{@intervals[0]}"
    end
  end

  @aggregator_mutex = Mutex.new
  # TODO:
  # - Duplicate intervals - Done
  # - Sort intervals - Done
  # - Validate aggregation_names, group_field_names, aggregate_field_names
end

Instance Method Details

#add_events(record) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/dataoperations-aggregate.rb', line 89

def add_events(record)
  timestamp = nil
  if !record.key?(@time_field) || !(timestamp = DateTime.strptime(record[@time_field], @time_format).to_time.to_i)
    timestamp = DateTime.now.to_time.to_i
  end

  current_interval_seconds = (timestamp / @intervals[0]) * @intervals[0]
  aggregator_hash_key = current_interval_seconds

  hash_group_key = nil
  @group_field_names.each do |field_name|
    hash_group_key = !hash_group_key.nil? ? "#{hash_group_key}_#{field_name}:#{record[field_name]}" : "#{field_name}:#{record[field_name]}"
  end

  aggregator_item = {}
  if @aggregator.key?(hash_group_key)
    aggregator_item = @aggregator[hash_group_key]
  else
    group_detail = {}
    aggregate_detail = {}
    interval_detail = {}
    @group_field_names.each do |field_name|
      group_detail[field_name] = record.key?(field_name) ? record[field_name] : @field_no_data_value
    end

    # Add interval empty data
    @intervals.each do |interval|
      interval_detail[interval.to_s] = {}
    end

    aggregator_item['group_fields'] = group_detail
    aggregator_item['aggregate_fields'] = aggregate_detail
    aggregator_item['intervals'] = interval_detail

	@aggregator_mutex.synchronize {@aggregator[hash_group_key] = aggregator_item}
  end

  if !aggregator_item['aggregate_fields'].key?(aggregator_hash_key)
    hash_aggregator = {}
    hash_aggregator[:time_started] = Time.now.to_i
    hash_aggregator['processed'] = 1
	@aggregator_mutex.synchronize {aggregator_item['aggregate_fields'][aggregator_hash_key] = hash_aggregator}
  else
    aggregator_item['aggregate_fields'][aggregator_hash_key]['processed'] += 1
  end

  @aggregate_field_names.each do |field_name|
    aggregate_values = []
    if aggregator_item['aggregate_fields'][aggregator_hash_key].key?(field_name)
      aggregate_values = aggregator_item['aggregate_fields'][aggregator_hash_key][field_name]
    end
    if record[field_name].is_a?(Integer) || record[field_name].is_a?(Float)
      aggregate_values << record[field_name]
    else
      aggregate_values << 0
    end
    aggregator_item['aggregate_fields'][aggregator_hash_key][field_name] = aggregate_values
  end
end

#aggregate_dataObject



149
150
151
# File 'lib/dataoperations-aggregate.rb', line 149

def aggregate_data
  @aggregator
end

#aggregate_eventsObject



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/dataoperations-aggregate.rb', line 153

def aggregate_events
  aggregate_data = {}

  # @log.debug @aggregator
  @aggregator_mutex.synchronize do
    current_time = Time.now.to_i
    @aggregator.each do |group_item_key, group_item_value|
      aggregate_first_interval(aggregate_data, current_time, group_item_value)

      # Calculate subsecuents aggregations
      group_item_value['intervals'].keys[1..-1].each do |s_interval|
        aggregate_subsequents_intervals(aggregate_data, current_time, group_item_value, s_interval)
      end
    end
  end

  # @log.debug aggregate_data
  aggregate_data unless aggregate_data.empty?
  # rescue Exception => e
  #  $log.error e
end

#log_level(log_level) ⇒ Object



85
86
87
# File 'lib/dataoperations-aggregate.rb', line 85

def log_level(log_level)
  @log.level = log_level
end