Class: SQLiteSweep::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/sqlitesweep/runner.rb

Overview

Main orchestrator. Wires all components together and runs the sweep.

Flow:

1. Reads database URIs from SourceStream (which runs the -s command)
2. Local URIs are submitted directly to the WorkerPool for Query::Local
3. Remote URIs are sent to HostBatcher, which groups them by host and
   flushes batches to the WorkerPool for Query::Remote
4. Results feed into the Aggregator (thread-safe sum/avg/list)
5. Display refreshes a live status line on stderr
6. Final aggregated result is printed to stdout

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Runner

Returns a new instance of Runner.



14
15
16
17
# File 'lib/sqlitesweep/runner.rb', line 14

def initialize(config)
  @config = config
  @shutting_down = false
end

Instance Method Details

#runObject

Executes the full sweep. Blocks until all databases are queried and the final result is printed to stdout.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/sqlitesweep/runner.rb', line 21

def run
  result_file = ResultFile.new if @config.action == :list
  aggregator = Aggregator.new(@config.action, result_file: result_file)
  display = Display.new(aggregator, live: @config.live)
  pool = WorkerPool.new(@config.concurrency)
  local_query = Query::Local.new(@config)
  ssh_manager = SSH::ConnectionManager.new(@config)
  batcher = HostBatcher.new(@config, pool, aggregator, display, ssh_manager)

  setup_signal_handlers(pool, ssh_manager, display, batcher)

  source = SourceStream.new(@config.source)
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  display.start(start_time)

  # Main loop: read URIs and dispatch to the appropriate query path
  source.each do |uri|
    break if @shutting_down

    if uri.local?
      pool.submit do
        begin
          result = local_query.execute(uri)
          aggregator.add(result)
        rescue QueryError => e
          aggregator.record_error
          $stderr.puts "\n#{e.message}" unless @config.live
        end
        display.refresh
      end
    else
      batcher.add(uri)
    end
  end

  # Drain remaining batches that haven't hit batch_size yet
  batcher.flush_all
  pool.shutdown

  display.finish
  result_file&.close

  output = aggregator.value
  puts output
rescue SourceError => e
  $stderr.puts "\nError: #{e.message}"
  exit 1
ensure
  ssh_manager&.shutdown
end