Class: SQLiteSweep::HostBatcher
- Inherits:
-
Object
- Object
- SQLiteSweep::HostBatcher
- Defined in:
- lib/sqlitesweep/host_batcher.rb
Overview
Groups remote database URIs by host and flushes them as batches.
Instead of making one SSH round-trip per database, HostBatcher accumulates URIs for the same host and submits them as a single batch to the worker pool. Each batch becomes one SSH command that queries multiple databases sequentially on the remote host.
Batches are flushed when either:
- The batch reaches --batch-size (default 4)
- A timeout fires (200ms) to avoid stalling on slow source streams
Local URIs should NOT be sent here — they bypass batching entirely and go straight to the worker pool (see Runner).
Constant Summary collapse
- FLUSH_TIMEOUT =
How long to wait before flushing an incomplete batch (seconds). Prevents URIs from sitting in the buffer when the source stream is slow.
0.2
Instance Method Summary collapse
-
#add(uri) ⇒ Object
Adds a remote URI to the appropriate host batch.
-
#cancel ⇒ Object
Cancels all pending batches and timers.
-
#flush_all ⇒ Object
Flushes all pending batches regardless of size.
-
#initialize(config, pool, aggregator, display, ssh_manager) ⇒ HostBatcher
constructor
A new instance of HostBatcher.
Constructor Details
#initialize(config, pool, aggregator, display, ssh_manager) ⇒ HostBatcher
Returns a new instance of HostBatcher.
26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/sqlitesweep/host_batcher.rb', line 26 def initialize(config, pool, aggregator, display, ssh_manager) @config = config @pool = pool @aggregator = aggregator @display = display @ssh_manager = ssh_manager @batch_size = config.batch_size @mutex = Mutex.new @batches = {} # host_key => [uri, ...] @timers = {} # host_key => Thread @cancelled = false end |
Instance Method Details
#add(uri) ⇒ Object
Adds a remote URI to the appropriate host batch. May trigger an immediate flush if the batch is full.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/sqlitesweep/host_batcher.rb', line 43 def add(uri) return if @cancelled host_key = uri.host_key flush_batch = nil @mutex.synchronize do @batches[host_key] ||= [] @batches[host_key] << uri if @batches[host_key].length >= @batch_size flush_batch = @batches.delete(host_key) cancel_timer(host_key) else start_timer(host_key) unless @timers[host_key] end end submit_batch(flush_batch) if flush_batch end |
#cancel ⇒ Object
Cancels all pending batches and timers. Used during shutdown.
79 80 81 82 83 84 85 86 |
# File 'lib/sqlitesweep/host_batcher.rb', line 79 def cancel @mutex.synchronize do @cancelled = true @timers.each_value(&:kill) @timers.clear @batches.clear end end |
#flush_all ⇒ Object
Flushes all pending batches regardless of size. Called at the end of a sweep to drain any partial batches that haven’t reached batch_size.
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/sqlitesweep/host_batcher.rb', line 66 def flush_all batches_to_flush = nil @mutex.synchronize do @timers.each_value(&:kill) @timers.clear batches_to_flush = @batches.dup @batches.clear end batches_to_flush.each_value { |batch| submit_batch(batch) } end |