Class: DartDecomQuery

Inherits:
Object show all
Includes:
DartCommon
Defined in:
lib/cosmos/dart/lib/dart_decom_query.rb

Overview

JsonDRb server which responds to queries for decommutated and reduced data from the database.

Constant Summary

Constants included from DartCommon

DartCommon::INITIALIZING, DartCommon::MAX_COLUMNS_PER_TABLE, DartCommon::MAX_STRING_BIT_SIZE, DartCommon::MAX_STRING_BYTE_SIZE, DartCommon::PARSING_REGEX, DartCommon::READY_TO_REDUCE, DartCommon::REDUCED, DartCommon::REDUCED_TYPES

Constants included from DartConstants

DartConstants::MAX_DECOM_RESULTS

Instance Method Summary collapse

Methods included from DartCommon

#comparison_cast, #decommutate_item?, #each_decom_and_reduced_table, #find_packet_log_entry, #get_decom_table_model, #get_table_model, handle_argv, #lookup_target_and_packet_id, #process_meta_filters, #query_decom_reduced, #read_packet_from_ple, #separate_raw_con?, #setup_packet_config, #switch_and_get_system_config, #sync_targets_and_packets

Instance Method Details

#clear_errorsObject



270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/cosmos/dart/lib/dart_decom_query.rb', line 270

def clear_errors
  time = Time.now
  Cosmos::Logger.info("#{time.formatted}: clear_errors")
  status = Status.first
  status.decom_error_count = 0
  status.decom_message = ''
  status.decom_message_time = time
  status.reduction_error_count = 0
  status.reduction_message = ''
  status.reduction_message_time = time
  status.save!

  return nil
end

#dart_statusObject

Returns status on the DART Database



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/cosmos/dart/lib/dart_decom_query.rb', line 186

def dart_status
  start_time = Time.now
  Cosmos::Logger.info("#{start_time.formatted}: dart_status")
  result = {}
  status = Status.first

  # Ingest Status
  # ---------------
  # Last PacketLogEntry Id
  last_ple = PacketLogEntry.select("id").last
  if last_ple
    result[:LAST_PLE_ID] = last_ple.id
  else
    result[:LAST_PLE_ID] = -1
  end
  # Num PacketLogEntries Needing Decom state = 0
  result[:PLE_STATE_NEED_DECOM] = PacketLogEntry.where("decom_state = 0").count
  # Num PacketLogEntries Errored - state >= 3
  result[:PLE_STATE_ERROR] = PacketLogEntry.where("decom_state >= 3").count
  # First Time in Database
  sort_first_ple = PacketLogEntry.order("time ASC").select("time").first
  if sort_first_ple
    result[:PLE_FIRST_TIME_S] = sort_first_ple.time.tv_sec
    result[:PLE_FIRST_TIME_US] = sort_first_ple.time.tv_usec
  else
    result[:PLE_FIRST_TIME_S] = 0
    result[:PLE_FIRST_TIME_US] = 0
  end
  # Last Time in Database
  sort_last_ple = PacketLogEntry.order("time DESC").select("time").first
  if sort_last_ple
    result[:PLE_LAST_TIME_S] = sort_last_ple.time.tv_sec
    result[:PLE_LAST_TIME_US] = sort_last_ple.time.tv_usec
  else
    result[:PLE_LAST_TIME_S] = 0
    result[:PLE_LAST_TIME_US] = 0
  end

  # Decom Status
  # ---------------
  # Decom Count
  result[:DECOM_COUNT] = status.decom_count
  # Decom Errors
  result[:DECOM_ERROR_COUNT] = status.decom_error_count
  # Decom Message
  result[:DECOM_MESSAGE] = status.decom_message
  # Decom Message Time
  result[:DECOM_MESSAGE_TIME_S] = status.decom_message_time.tv_sec
  result[:DECOM_MESSAGE_TIME_US] = status.decom_message_time.tv_usec

  # Reduction Status
  # ---------------
  # Reduction Count
  result[:REDUCTION_COUNT] = status.reduction_count
  # Reduction Errors
  result[:REDUCTION_ERROR_COUNT] = status.reduction_error_count
  # Reduction Message
  result[:REDUCTION_MESSAGE] = status.reduction_message
  # Reduction Time
  result[:REDUCTION_MESSAGE_TIME_S] = status.reduction_message_time.tv_sec
  result[:REDUCTION_MESSAGE_TIME_US] = status.reduction_message_time.tv_usec

  # Storage
  # ---------------
  Cosmos.set_working_dir do
    # Size of outputs/dart/data folder
    result[:DART_DATA_BYTES] = Dir.glob(File.join(Cosmos::System.paths['DART_DATA'], '**', '*')).map{ |f| File.size(f) }.inject(:+)
    # Size of outputs/dart/logs folder
    result[:DART_LOGS_BYTES] = Dir.glob(File.join(Cosmos::System.paths['DART_LOGS'], '**', '*')).map{ |f| File.size(f) }.inject(:+)
  end
  # Size of SQL Database
  begin
    result[:DART_DATABASE_BYTES] = ActiveRecord::Base.connection.execute("select pg_database_size('#{ActiveRecord::Base.connection_config[:database]}');")[0]['pg_database_size']
  rescue
    result[:DART_DATABASE_BYTES] = -1
  end

  end_time = Time.now
  delta = end_time - start_time
  result[:DART_STATUS_SECONDS] = delta

  return result
end

#item_names(target_name, packet_name, is_tlm = true) ⇒ Array<String>

Gets the list of item names for a given packet

Parameters:

  • target_name

    Target name

  • packet_name

    Packet name

  • is_tlm (defaults to: true)

    true or false

Returns:



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/cosmos/dart/lib/dart_decom_query.rb', line 169

def item_names(target_name, packet_name, is_tlm = true)
  Cosmos::Logger.info("#{Time.now.formatted}: item_names")

  target = Target.where("name = ?", target_name).first
  raise "Target #{target_name} not found" unless target

  packet = Packet.where("target_id = ? and name = ? and is_tlm = ?", target.id, packet_name, is_tlm).first
  raise "Packet #{target_name} #{packet_name} not found" unless packet

  items = Item.where("packet_id = ?", packet.id).select("name")
  item_names = []
  items.each { |item| item_names << item.name }

  return item_names
end

#query(request) ⇒ Array<Array<String, Integer, Integer, Integer, Integer>>

Returns data from the decommutated database tables including the reduced data tables.

Parameters:

  • request (Hash)

    Request for data. The hash must contain the following items:

    start_time_sec => Start time in UTC seconds
    start_time_usec => Microseconds to add to start time
    end_time_sec => End time in UTC seconds
    end_time_usec => Microseconds to add to end time
    item => [target name, packet name, item name] Names are all strings
    reduction => "NONE", "MINUTE", "HOUR", "DAY" for how to reduce the data
    value_type => "RAW", "RAW_MAX", "RAW_MIN", "RAW_AVG",
      "CONVERTED", "CONVERTED_MAX", "CONVERTED_MIN", "CONVERTED_AVG"
    

    The request can also contain the following optional items:

    meta_ids => Optional IDs related to the meta data you want to filter by. This requires
      making a separate request for the particular meta data in question and recording
      the returned meta_ids for use in a subsequent request.
    limit => Maximum number of data items to return, must be less than DartCommon::MAX_DECOM_RESULTS
    offset => Offset into the data stream. Use this to get more than the DartCommon::MAX_DECOM_RESULTS
      by making multipe requests with multiples of the DartCommon::MAX_DECOM_RESULTS value.
    cmd_tlm => Whether the item is a command or telemetry. Default is telemetry.
    

Returns:

  • (Array<Array<String, Integer, Integer, Integer, Integer>>)

    Array of arrays containing the item name, item seconds, item microseconds, samples (always 1 for NONE reduction, varies for other reduction values), and meta_id.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/cosmos/dart/lib/dart_decom_query.rb', line 53

def query(request)
  request_start_time = Time.now
  Cosmos::Logger.info("#{request_start_time.formatted}: query: #{request}")

  begin
    start_time_sec = request['start_time_sec']
    start_time_usec = request['start_time_usec']
    end_time_sec = request['end_time_sec']
    end_time_usec = request['end_time_usec']

    start_time = nil
    end_time = nil
    begin
      start_time = Time.at(start_time_sec, start_time_usec) if start_time_sec and start_time_usec
    rescue
      raise "Invalid start time: #{start_time_sec}, #{start_time_usec}"
    end
    begin
      end_time = Time.at(end_time_sec, end_time_usec) if end_time_sec and end_time_usec
    rescue
      raise "Invalid end time: #{end_time_sec}, #{end_time_usec}"
    end

    item = request['item']
    raise "Item \"#{item}\" invalid" if !item || item.length != 3

    reduction = request['reduction'].to_s.upcase
    case reduction
    when "", "NONE"
      reduction = :NONE
      reduction_modifier = ""
    when "MINUTE"
      reduction = :MINUTE
      reduction_modifier = "_m"
    when "HOUR"
      reduction = :HOUR
      reduction_modifier = "_h"
    when "DAY"
      reduction = :DAY
      reduction_modifier = "_d"
    else
      raise "Unknown reduction: #{reduction}"
    end

    requested_value_type = request['value_type'].to_s.upcase
    case requested_value_type
    when 'RAW'
      value_type = ItemToDecomTableMapping::CONVERTED
      item_name_modifier = ""
      raise "RAW value_type is only valid with NONE reduction" if reduction != :NONE
    when 'RAW_MAX', 'RAW_MIN', 'RAW_AVG', 'RAW_STDDEV'
      value_type = ItemToDecomTableMapping::CONVERTED
      item_name_modifier = requested_value_type.split('_')[1].downcase
      raise "#{requested_value_type} value_type is not valid with NONE reduction" if reduction == :NONE
    when 'CONVERTED'
      value_type = ItemToDecomTableMapping::RAW
      item_name_modifier = ""
      raise "CONVERTED value_type is only valid with NONE reduction" if reduction != :NONE
    when 'CONVERTED_MAX', 'CONVERTED_MIN', 'CONVERTED_AVG', 'CONVERTED_STDDEV'
      value_type = ItemToDecomTableMapping::RAW
      item_name_modifier = requested_value_type.split('_')[1].downcase
      raise "#{requested_value_type} value_type is not valid with NONE reduction" if reduction == :NONE
    else
      raise "Unknown value_type: #{requested_value_type}"
    end

    cmd_tlm = request['cmd_tlm']
    if cmd_tlm
      if cmd_tlm.to_s.upcase == 'CMD'
        is_tlm = false
      elsif cmd_tlm.to_s.upcase == 'TLM'
        is_tlm = true
      else
        raise "Unknown cmd_tlm: #{cmd_tlm}"
      end
    else
      is_tlm = true
    end

    meta_ids = request['meta_ids']
    meta_ids ||= []

    unless meta_ids.length > 0
      meta_filters = request['meta_filters']
      meta_filters ||= []

      if meta_filters.length > 0
        meta_ids = process_meta_filters(meta_filters, is_tlm, end_time)
      end
    end

    limit = request['limit'].to_i
    limit = MAX_DECOM_RESULTS if limit <= 0 or limit > MAX_DECOM_RESULTS

    offset = request['offset'].to_i
    offset = 0 if offset < 0

    return query_decom_reduced(
      item[0], item[1], item[2],
      value_type, is_tlm,
      start_time, end_time,
      reduction, reduction_modifier,
      item_name_modifier, limit, offset, meta_ids)

  rescue Exception => error
    msg = "Query Error: #{error.message}"
    raise $!, msg, $!.backtrace
  end
end