Class: DartReducerWorkerThread

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmos/dart/lib/dart_reducer_worker_thread.rb

Overview

Thread which performs data reduction in the DART database.

Constant Summary collapse

HOLD_OFF_TIME =

This constant controls how much spread there must be in the data before doing a reduction. Since our minimum reduction is 1 minute, we will wait until we have at least two minutes of spread. Not as important for higher order reductions but also ensures that there is a spread in the data points.

2.minutes

Instance Method Summary collapse

Constructor Details

#initialize(master_queue, locked_tables, mutex, instance_num, status) ⇒ DartReducerWorkerThread

Create a new thread and start it

Parameters:

  • master_queue (Queue)

    Queue which the new thread will be added to

  • locked_tables (Array<Array<Symbol, Integer, Integer>>)

    Array of all the tables which are currently being processed. The first parameter is the table type and must be :MINUTE, :HOUR or :DAY. The second and third values are the PacketConfig ID and table index.

  • mutex (Mutex)

    Mutex used to synchronize access to the locked_tables

  • instance_num (Integer)

    Simple counter to trace the thread instance

  • status (DartReducerStatus)

    Status structure



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 32

def initialize(master_queue, locked_tables, mutex, instance_num, status)
  @instance_num = instance_num
  @running = true
  @master_queue = master_queue
  @locked_tables = locked_tables
  @mutex = mutex
  @thread_queue = Queue.new
  @status = status
  # Start the thread which will wait on @thread_queue.pop
  @thread = Thread.new { work() }
  # Add the local @thread_queue to the @master_queue so jobs can be added
  @master_queue << @thread_queue
end

Instance Method Details

#create_reduced_row(sample_rows, base_model, reduction_model, base_model_time_column, mappings, job_type) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 115

def create_reduced_row(sample_rows, base_model, reduction_model, base_model_time_column, mappings, job_type)
  new_row = reduction_model.new
  new_row.start_time = sample_rows[0].send(base_model_time_column)
  new_row.num_samples = sample_rows.length
  new_row.meta_id = sample_rows[0].meta_id
  new_row.packet_log_id = sample_rows[0].packet_log_id
  # Process each of the ItemToDecomTableMapping to get the item to be reduced
  mappings.each do |mapping|
    item_name = "i#{mapping.item_index}"
    min_item_name = "i#{mapping.item_index}min"
    max_item_name = "i#{mapping.item_index}max"
    avg_item_name = "i#{mapping.item_index}avg"
    stddev_item_name = "i#{mapping.item_index}stddev"
    min_value = nil
    max_value = nil
    total_samples = 0 # s0
    avg_value = 0.0 # s1
    s2 = 0.0
    stddev_value = 0.0
    min_nan_found = false
    max_nan_found = false
    avg_nan_found = false
    stddev_nan_found = false
    # Process each of the rows in the base model which is the decommutation table
    # or a lesser reduction table (the minute or hour table).
    sample_rows.each do |row_to_reduce|
      # If we processing minute data we're reading from the base decommutation table
      # thus there is only raw values to read
      if job_type == :MINUTE
        value = row_to_reduce.read_attribute(item_name)
        min_sample = value
        max_sample = value
        avg_sample = value
        if value.nil?
          handle_error("#{item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}")
          next
        end
      else # :HOUR or :DAY
        # We're processing hour or day data so we're reducing previously reduced data
        # thus there are min, max, and average values to read
        min_sample = row_to_reduce.read_attribute(min_item_name)
        max_sample = row_to_reduce.read_attribute(max_item_name)
        avg_sample = row_to_reduce.read_attribute(avg_item_name)
        stddev_sample = row_to_reduce.read_attribute(stddev_item_name)
        if min_sample.nil?
          handle_error("#{min_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}")
          next
        end
        if max_sample.nil?
          handle_error("#{max_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}")
          next
        end
        if avg_sample.nil?
          handle_error("#{avg_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}")
          next
        end
        if stddev_sample.nil?
          handle_error("#{stddev_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}")
          next
        end
      end

      if nan_value?(min_sample)
        min_nan_found = true
      else
        if !min_value or min_sample < min_value
          min_value = min_sample
        end
      end

      if nan_value?(max_sample)
        max_nan_found = true
      else
        if !max_value or max_sample > max_value
          max_value = max_sample
        end
      end

      if nan_value?(avg_sample)
        avg_nan_found = true
      else
        # MINUTE data is reducing the decommutated values
        if job_type == :MINUTE
          total_samples += 1 # s0
          avg_value += avg_sample # s1
          s2 += (avg_sample * avg_sample)
        else # :HOUR or :DAY
          # Aggregated Stddev
          # See https://math.stackexchange.com/questions/1547141/aggregating-standard-deviation-to-a-summary-point
          total_samples += row_to_reduce.num_samples # s0
          avg_value += (avg_sample * row_to_reduce.num_samples) # s1
          s2 += row_to_reduce.num_samples * (avg_sample * avg_sample + stddev_sample * stddev_sample)
        end
      end
    end
    if total_samples != 0
      # Aggregated Stddev
      # See https://math.stackexchange.com/questions/1547141/aggregating-standard-deviation-to-a-summary-point
      avg_value = avg_value.to_f / total_samples
      # Note: For very large numbers with very small deviations this sqrt can fail.  If so then just set the stddev to 0.
      begin
        stddev_value = sqrt((s2 / total_samples) - (avg_value * avg_value))
      rescue Exception
        stddev_value = 0.0
      end
    end
    min_value = Float::NAN if min_nan_found and !min_value
    max_value = Float::NAN if max_nan_found and !max_value
    if avg_nan_found and total_samples == 0
      avg_value = Float::NAN
      stddev_value = Float::NAN
    end
    new_row.write_attribute(min_item_name, min_value)
    new_row.write_attribute(max_item_name, max_value)
    new_row.write_attribute(avg_item_name, avg_value)
    new_row.write_attribute(stddev_item_name, stddev_value)
  end
  base_model.where(id: sample_rows.map(&:id)).update_all(:reduced_state => DartCommon::REDUCED)
  new_row.save! # Create the reduced data row in the database
  base_model.where(id: sample_rows.map(&:id)).update_all(:reduced_id => new_row.id)
  new_row.reduced_state = DartCommon::READY_TO_REDUCE
  new_row.save!
  @status.count += 1

  Cosmos::Logger.debug("Created #{new_row.class}:#{new_row.id} with #{mappings.length} items from #{new_row.num_samples} samples")    
end

#graceful_killObject

Call shutdown to gracefully shutdown the worker thread



255
256
257
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 255

def graceful_kill
  shutdown()
end

#joinObject

Kill the worker thread



250
251
252
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 250

def join
  Cosmos.kill_thread(self, @thread)
end

#shutdownObject

Shutdown the worker thread



243
244
245
246
247
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 243

def shutdown
  @running = false
  # Push the queue to allow the thread to run and shutdown
  @thread_queue << nil
end

#workObject

Pop a job off the queue and find items which are able to be reduced. Calculate the min, max, and average value over the reduction period (min, hour, or day) and save to the reduction table tXXX_YYY_Z where XXX is the PacketConfig ID, YYY is the table index, and Z is ‘m’, ‘h’, or ‘d’ (minute, hour, day).



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 51

def work
  while @running # Set to false in shutdown()
    job_type, packet_config_id, table_index, base_model, reduction_model = @thread_queue.pop
    break unless job_type # shutdown was called

    # Find all the items which are able to be reduced (reduced = true)
    mappings = ItemToDecomTableMapping.where("packet_config_id = ? and table_index = ? and reduced = true",
      packet_config_id, table_index)
    # The only way to not have any mappings is if the packet contains no items
    # which can be reduced (no integer or float values). This would be extremely rare.
    if mappings.length == 0
      Cosmos::Logger.debug("No Mappings for JobType #{job_type}: #{packet_config_id}: #{table_index}")
      complete_job(job_type, packet_config_id, table_index)
      next
    end

    time_delta, base_model_time_column, time_method = job_attributes(job_type)
    rows = []
    done = false
    # Find all the rows in the decommutation table which are ready to reduce
    row_ids = base_model.where("reduced_state = #{DartCommon::READY_TO_REDUCE}").order("meta_id ASC, #{base_model_time_column} ASC").pluck(:id)
    if row_ids.length > 0
      first_row = base_model.find(row_ids[0])
      last_row = base_model.find(row_ids[-1])
      first_query_row_time = first_row.send(base_model_time_column)
      last_query_row_time = last_row.send(base_model_time_column)
      # Require at least a 2 minute spread to ensure a full minute of context is available
      if (last_query_row_time - first_query_row_time) > HOLD_OFF_TIME
        row_ids.in_groups_of(1000, false).each do |group_row_ids|
          break if done
          query_rows = base_model.order("meta_id ASC, #{base_model_time_column} ASC").where(id: group_row_ids)
          query_rows.each do |row|
            rows << row
            first_row_time = rows[0].send(base_model_time_column)
            last_row_time = rows[-1].send(base_model_time_column)
            
            # Break if we are near the end of a minute
            if (last_query_row_time - last_row_time) < 1.minute
              done = true
              break
            end
            
            # Ensure we have conditions to process the reduction data
            next unless (last_row_time - first_row_time) > time_delta || # Enough samples or
                # The time attribute (min, hour, day) has changed or
                first_row_time.send(time_method) != last_row_time.send(time_method) ||
                rows[0].meta_id != rows[-1].meta_id # New meta data
            
            # Sample from the start to the second to last row because the last row
            # is where we detected a change. The last row will be part of a new sample set.
            sample_rows = rows[0..-2]
            create_reduced_row(sample_rows, base_model, reduction_model, base_model_time_column, mappings, job_type)
            rows = rows[-1..-1] # Start a new sample with the last item in the previous sample
          end
        end
      end
    end
    complete_job(job_type, packet_config_id, table_index)
  end # while @running
  Cosmos::Logger.info("Reducer Thread #{@instance_num} Shutdown")
rescue Exception => error
  handle_error("Reducer Thread Unexpectedly Died: #{error.formatted}")
end