Class: SQLiteSweep::HostBatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/sqlitesweep/host_batcher.rb

Overview

Groups remote database URIs by host and flushes them as batches.

Instead of making one SSH round-trip per database, HostBatcher accumulates URIs for the same host and submits them as a single batch to the worker pool. Each batch becomes one SSH command that queries multiple databases sequentially on the remote host.

Batches are flushed when either:

- The batch reaches --batch-size (default 4)
- A timeout fires (200ms) to avoid stalling on slow source streams

Local URIs should NOT be sent here — they bypass batching entirely and go straight to the worker pool (see Runner).

Examples:

batcher = HostBatcher.new(config, pool, aggregator, display, ssh_manager)
batcher.add(uri)      # accumulates by host
batcher.flush_all     # drains any remaining partial batches

Constant Summary collapse

FLUSH_TIMEOUT =

How long to wait before flushing an incomplete batch (seconds). Prevents URIs from sitting in the buffer when the source stream is slow.

0.2

Instance Method Summary collapse

Constructor Details

#initialize(config, pool, aggregator, display, ssh_manager) ⇒ HostBatcher

Returns a new instance of HostBatcher.



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/sqlitesweep/host_batcher.rb', line 26

def initialize(config, pool, aggregator, display, ssh_manager)
  @config = config
  @pool = pool
  @aggregator = aggregator
  @display = display
  @ssh_manager = ssh_manager
  @batch_size = config.batch_size
  @mutex = Mutex.new
  @batches = {} # host_key => [uri, ...]
  @timers = {}  # host_key => Thread
  @cancelled = false
end

Instance Method Details

#add(uri) ⇒ Object

Adds a remote URI to the appropriate host batch. May trigger an immediate flush if the batch is full.

Parameters:



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/sqlitesweep/host_batcher.rb', line 43

def add(uri)
  return if @cancelled

  host_key = uri.host_key
  flush_batch = nil

  @mutex.synchronize do
    @batches[host_key] ||= []
    @batches[host_key] << uri

    if @batches[host_key].length >= @batch_size
      flush_batch = @batches.delete(host_key)
      cancel_timer(host_key)
    else
      start_timer(host_key) unless @timers[host_key]
    end
  end

  submit_batch(flush_batch) if flush_batch
end

#cancelObject

Cancels all pending batches and timers. Used during shutdown.



79
80
81
82
83
84
85
86
# File 'lib/sqlitesweep/host_batcher.rb', line 79

def cancel
  @mutex.synchronize do
    @cancelled = true
    @timers.each_value(&:kill)
    @timers.clear
    @batches.clear
  end
end

#flush_allObject

Flushes all pending batches regardless of size. Called at the end of a sweep to drain any partial batches that haven’t reached batch_size.



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/sqlitesweep/host_batcher.rb', line 66

def flush_all
  batches_to_flush = nil
  @mutex.synchronize do
    @timers.each_value(&:kill)
    @timers.clear
    batches_to_flush = @batches.dup
    @batches.clear
  end

  batches_to_flush.each_value { |batch| submit_batch(batch) }
end