Class: SQLiteSweep::Aggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/sqlitesweep/aggregator.rb

Overview

Thread-safe accumulator for query results. Supports three modes:

:sum     - Running total of the first column value from each database.
:average - Running total divided by the number of databases queried.
:list    - Delegates to a ResultFile that streams rows to a JSONL file.

For :sum and :average, the first value of the first row from each query result is extracted and added to a running total. This means your query should return a single numeric value (e.g. SELECT count(*) FROM …).

All methods are mutex-protected and safe to call from multiple worker threads.

Examples:

agg = Aggregator.new(:sum)
agg.add(Result.new(rows: [{"count" => 10}], source: "db1"))
agg.add(Result.new(rows: [{"count" => 20}], source: "db2"))
agg.value  # => "30"
agg.count  # => 2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(action, result_file: nil) ⇒ Aggregator

Returns a new instance of Aggregator.

Parameters:

  • action (Symbol)

    One of :sum, :average, or :list.

  • result_file (ResultFile, nil) (defaults to: nil)

    Required for :list action. Receives streamed results so they don’t accumulate in memory.



31
32
33
34
35
36
37
38
# File 'lib/sqlitesweep/aggregator.rb', line 31

def initialize(action, result_file: nil)
  @action = action
  @result_file = result_file
  @mutex = Mutex.new
  @total = 0.0
  @count = 0
  @error_count = 0
end

Instance Attribute Details

#countInteger (readonly)

Returns Number of databases successfully queried.

Returns:

  • (Integer)

    Number of databases successfully queried.



23
24
25
# File 'lib/sqlitesweep/aggregator.rb', line 23

def count
  @count
end

#error_countInteger (readonly)

Returns Number of databases that produced errors.

Returns:

  • (Integer)

    Number of databases that produced errors.



26
27
28
# File 'lib/sqlitesweep/aggregator.rb', line 26

def error_count
  @error_count
end

Instance Method Details

#add(result) ⇒ Object

Records a successful query result.

Parameters:

  • result (Result)

    The query result to aggregate.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/sqlitesweep/aggregator.rb', line 43

def add(result)
  @mutex.synchronize do
    case @action
    when :sum, :average
      result.rows.each do |row|
        value = row.values.first
        @total += value.to_f
      end
    when :list
      @result_file&.write(result)
    end
    @count += 1
  end
end

#record_errorObject

Records a failed query (increments error count).



59
60
61
# File 'lib/sqlitesweep/aggregator.rb', line 59

def record_error
  @mutex.synchronize { @error_count += 1 }
end

#valueString

Returns the final aggregated value as a string.

Returns:

  • (String)

    For :sum, the total. For :average, the mean. For :list, the path to the JSONL result file.



67
68
69
70
71
72
73
74
75
76
# File 'lib/sqlitesweep/aggregator.rb', line 67

def value
  case @action
  when :sum
    format_number(@total)
  when :average
    @count > 0 ? format_number(@total / @count) : "0"
  when :list
    @result_file&.path
  end
end