Class: SQLiteSweep::Runner
- Inherits:
-
Object
- Object
- SQLiteSweep::Runner
- Defined in:
- lib/sqlitesweep/runner.rb
Overview
Main orchestrator. Wires all components together and runs the sweep.
Flow:
1. Reads database URIs from SourceStream (which runs the -s command)
2. Local URIs are submitted directly to the WorkerPool for Query::Local
3. Remote URIs are sent to HostBatcher, which groups them by host and
flushes batches to the WorkerPool for Query::Remote
4. Results feed into the Aggregator (thread-safe sum/avg/list)
5. Display refreshes a live status line on stderr
6. Final aggregated result is printed to stdout
Instance Method Summary collapse
-
#initialize(config) ⇒ Runner
constructor
A new instance of Runner.
-
#run ⇒ Object
Executes the full sweep.
Constructor Details
#initialize(config) ⇒ Runner
Returns a new instance of Runner.
14 15 16 17 |
# File 'lib/sqlitesweep/runner.rb', line 14 def initialize(config) @config = config @shutting_down = false end |
Instance Method Details
#run ⇒ Object
Executes the full sweep. Blocks until all databases are queried and the final result is printed to stdout.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/sqlitesweep/runner.rb', line 21 def run result_file = ResultFile.new if @config.action == :list aggregator = Aggregator.new(@config.action, result_file: result_file) display = Display.new(aggregator, live: @config.live) pool = WorkerPool.new(@config.concurrency) local_query = Query::Local.new(@config) ssh_manager = SSH::ConnectionManager.new(@config) batcher = HostBatcher.new(@config, pool, aggregator, display, ssh_manager) setup_signal_handlers(pool, ssh_manager, display, batcher) source = SourceStream.new(@config.source) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) display.start(start_time) # Main loop: read URIs and dispatch to the appropriate query path source.each do |uri| break if @shutting_down if uri.local? pool.submit do begin result = local_query.execute(uri) aggregator.add(result) rescue QueryError => e aggregator.record_error $stderr.puts "\n#{e.}" unless @config.live end display.refresh end else batcher.add(uri) end end # Drain remaining batches that haven't hit batch_size yet batcher.flush_all pool.shutdown display.finish result_file&.close output = aggregator.value puts output rescue SourceError => e $stderr.puts "\nError: #{e.}" exit 1 ensure ssh_manager&.shutdown end |