Class: DartReducerWorkerThread
- Defined in:
- lib/cosmos/dart/lib/dart_reducer_worker_thread.rb
Overview
Thread which performs data reduction in the DART database.
Constant Summary collapse
- HOLD_OFF_TIME =
This constant controls how much spread there must be in the data before doing a reduction. Since our minimum reduction is 1 minute, we will wait until we have at least two minutes of spread. Not as important for higher order reductions but also ensures that there is a spread in the data points.
2.minutes
Instance Method Summary collapse
- #create_reduced_row(sample_rows, base_model, reduction_model, base_model_time_column, mappings, job_type) ⇒ Object
-
#graceful_kill ⇒ Object
Call shutdown to gracefully shutdown the worker thread.
-
#initialize(master_queue, locked_tables, mutex, instance_num, status) ⇒ DartReducerWorkerThread
constructor
Create a new thread and start it.
-
#join ⇒ Object
Kill the worker thread.
-
#shutdown ⇒ Object
Shutdown the worker thread.
-
#work ⇒ Object
Pop a job off the queue and find items which are able to be reduced.
Constructor Details
#initialize(master_queue, locked_tables, mutex, instance_num, status) ⇒ DartReducerWorkerThread
Create a new thread and start it
32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 32 def initialize(master_queue, locked_tables, mutex, instance_num, status) @instance_num = instance_num @running = true @master_queue = master_queue @locked_tables = locked_tables @mutex = mutex @thread_queue = Queue.new @status = status # Start the thread which will wait on @thread_queue.pop @thread = Thread.new { work() } # Add the local @thread_queue to the @master_queue so jobs can be added @master_queue << @thread_queue end |
Instance Method Details
#create_reduced_row(sample_rows, base_model, reduction_model, base_model_time_column, mappings, job_type) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 115 def create_reduced_row(sample_rows, base_model, reduction_model, base_model_time_column, mappings, job_type) new_row = reduction_model.new new_row.start_time = sample_rows[0].send(base_model_time_column) new_row.num_samples = sample_rows.length new_row. = sample_rows[0]. new_row.packet_log_id = sample_rows[0].packet_log_id # Process each of the ItemToDecomTableMapping to get the item to be reduced mappings.each do |mapping| item_name = "i#{mapping.item_index}" min_item_name = "i#{mapping.item_index}min" max_item_name = "i#{mapping.item_index}max" avg_item_name = "i#{mapping.item_index}avg" stddev_item_name = "i#{mapping.item_index}stddev" min_value = nil max_value = nil total_samples = 0 # s0 avg_value = 0.0 # s1 s2 = 0.0 stddev_value = 0.0 min_nan_found = false max_nan_found = false avg_nan_found = false stddev_nan_found = false # Process each of the rows in the base model which is the decommutation table # or a lesser reduction table (the minute or hour table). sample_rows.each do |row_to_reduce| # If we processing minute data we're reading from the base decommutation table # thus there is only raw values to read if job_type == :MINUTE value = row_to_reduce.read_attribute(item_name) min_sample = value max_sample = value avg_sample = value if value.nil? handle_error("#{item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end else # :HOUR or :DAY # We're processing hour or day data so we're reducing previously reduced data # thus there are min, max, and average values to read min_sample = row_to_reduce.read_attribute(min_item_name) max_sample = row_to_reduce.read_attribute(max_item_name) avg_sample = row_to_reduce.read_attribute(avg_item_name) stddev_sample = row_to_reduce.read_attribute(stddev_item_name) if min_sample.nil? handle_error("#{min_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end if max_sample.nil? handle_error("#{max_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end if avg_sample.nil? handle_error("#{avg_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end if stddev_sample.nil? handle_error("#{stddev_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end end if nan_value?(min_sample) min_nan_found = true else if !min_value or min_sample < min_value min_value = min_sample end end if nan_value?(max_sample) max_nan_found = true else if !max_value or max_sample > max_value max_value = max_sample end end if nan_value?(avg_sample) avg_nan_found = true else # MINUTE data is reducing the decommutated values if job_type == :MINUTE total_samples += 1 # s0 avg_value += avg_sample # s1 s2 += (avg_sample * avg_sample) else # :HOUR or :DAY # Aggregated Stddev # See https://math.stackexchange.com/questions/1547141/aggregating-standard-deviation-to-a-summary-point total_samples += row_to_reduce.num_samples # s0 avg_value += (avg_sample * row_to_reduce.num_samples) # s1 s2 += row_to_reduce.num_samples * (avg_sample * avg_sample + stddev_sample * stddev_sample) end end end if total_samples != 0 # Aggregated Stddev # See https://math.stackexchange.com/questions/1547141/aggregating-standard-deviation-to-a-summary-point avg_value = avg_value.to_f / total_samples # Note: For very large numbers with very small deviations this sqrt can fail. If so then just set the stddev to 0. begin stddev_value = sqrt((s2 / total_samples) - (avg_value * avg_value)) rescue Exception stddev_value = 0.0 end end min_value = Float::NAN if min_nan_found and !min_value max_value = Float::NAN if max_nan_found and !max_value if avg_nan_found and total_samples == 0 avg_value = Float::NAN stddev_value = Float::NAN end new_row.write_attribute(min_item_name, min_value) new_row.write_attribute(max_item_name, max_value) new_row.write_attribute(avg_item_name, avg_value) new_row.write_attribute(stddev_item_name, stddev_value) end base_model.where(id: sample_rows.map(&:id)).update_all(:reduced_state => DartCommon::REDUCED) new_row.save! # Create the reduced data row in the database base_model.where(id: sample_rows.map(&:id)).update_all(:reduced_id => new_row.id) new_row.reduced_state = DartCommon::READY_TO_REDUCE new_row.save! @status.count += 1 Cosmos::Logger.debug("Created #{new_row.class}:#{new_row.id} with #{mappings.length} items from #{new_row.num_samples} samples") end |
#graceful_kill ⇒ Object
Call shutdown to gracefully shutdown the worker thread
255 256 257 |
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 255 def graceful_kill shutdown() end |
#join ⇒ Object
Kill the worker thread
250 251 252 |
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 250 def join Cosmos.kill_thread(self, @thread) end |
#shutdown ⇒ Object
Shutdown the worker thread
243 244 245 246 247 |
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 243 def shutdown @running = false # Push the queue to allow the thread to run and shutdown @thread_queue << nil end |
#work ⇒ Object
Pop a job off the queue and find items which are able to be reduced. Calculate the min, max, and average value over the reduction period (min, hour, or day) and save to the reduction table tXXX_YYY_Z where XXX is the PacketConfig ID, YYY is the table index, and Z is ‘m’, ‘h’, or ‘d’ (minute, hour, day).
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/cosmos/dart/lib/dart_reducer_worker_thread.rb', line 51 def work while @running # Set to false in shutdown() job_type, packet_config_id, table_index, base_model, reduction_model = @thread_queue.pop break unless job_type # shutdown was called # Find all the items which are able to be reduced (reduced = true) mappings = ItemToDecomTableMapping.where("packet_config_id = ? and table_index = ? and reduced = true", packet_config_id, table_index) # The only way to not have any mappings is if the packet contains no items # which can be reduced (no integer or float values). This would be extremely rare. if mappings.length == 0 Cosmos::Logger.debug("No Mappings for JobType #{job_type}: #{packet_config_id}: #{table_index}") complete_job(job_type, packet_config_id, table_index) next end time_delta, base_model_time_column, time_method = job_attributes(job_type) rows = [] done = false # Find all the rows in the decommutation table which are ready to reduce row_ids = base_model.where("reduced_state = #{DartCommon::READY_TO_REDUCE}").order("meta_id ASC, #{base_model_time_column} ASC").pluck(:id) if row_ids.length > 0 first_row = base_model.find(row_ids[0]) last_row = base_model.find(row_ids[-1]) first_query_row_time = first_row.send(base_model_time_column) last_query_row_time = last_row.send(base_model_time_column) # Require at least a 2 minute spread to ensure a full minute of context is available if (last_query_row_time - first_query_row_time) > HOLD_OFF_TIME row_ids.in_groups_of(1000, false).each do |group_row_ids| break if done query_rows = base_model.order("meta_id ASC, #{base_model_time_column} ASC").where(id: group_row_ids) query_rows.each do |row| rows << row first_row_time = rows[0].send(base_model_time_column) last_row_time = rows[-1].send(base_model_time_column) # Break if we are near the end of a minute if (last_query_row_time - last_row_time) < 1.minute done = true break end # Ensure we have conditions to process the reduction data next unless (last_row_time - first_row_time) > time_delta || # Enough samples or # The time attribute (min, hour, day) has changed or first_row_time.send(time_method) != last_row_time.send(time_method) || rows[0]. != rows[-1]. # New meta data # Sample from the start to the second to last row because the last row # is where we detected a change. The last row will be part of a new sample set. sample_rows = rows[0..-2] create_reduced_row(sample_rows, base_model, reduction_model, base_model_time_column, mappings, job_type) rows = rows[-1..-1] # Start a new sample with the last item in the previous sample end end end end complete_job(job_type, packet_config_id, table_index) end # while @running Cosmos::Logger.info("Reducer Thread #{@instance_num} Shutdown") rescue Exception => error handle_error("Reducer Thread Unexpectedly Died: #{error.formatted}") end |