Class: SQLiteSweep::WorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/sqlitesweep/worker_pool.rb

Overview

Thread pool wrapper around Concurrent::FixedThreadPool.

Uses a fixed number of threads for IO-bound SSH and SQLite work. The :caller_runs fallback policy provides natural back-pressure: when all threads are busy, the submitting thread runs the work itself rather than queuing unboundedly.

Examples:

pool = WorkerPool.new(8)
pool.submit { query_database(uri) }
pool.shutdown  # waits for all work to complete

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ WorkerPool

Returns a new instance of WorkerPool.

Parameters:

  • size (Integer)

    Number of worker threads.



18
19
20
# File 'lib/sqlitesweep/worker_pool.rb', line 18

def initialize(size)
  @pool = Concurrent::FixedThreadPool.new(size, fallback_policy: :caller_runs)
end

Instance Method Details

#killObject

Immediately kills all worker threads. Used for forced shutdown (e.g. Ctrl+C).



41
42
43
# File 'lib/sqlitesweep/worker_pool.rb', line 41

def kill
  @pool.kill
end

#shutdown(timeout = 60) ⇒ Object

Gracefully shuts down the pool: stops accepting new work and waits for all submitted work to finish.

Parameters:

  • timeout (Integer) (defaults to: 60)

    Max seconds to wait for completion.



35
36
37
38
# File 'lib/sqlitesweep/worker_pool.rb', line 35

def shutdown(timeout = 60)
  @pool.shutdown
  @pool.wait_for_termination(timeout)
end

#submit(&block) ⇒ Object

Submits a block to be executed by a worker thread. If all workers are busy, the block runs on the calling thread (back-pressure via :caller_runs).

Yield Returns:

  • (void)


27
28
29
# File 'lib/sqlitesweep/worker_pool.rb', line 27

def submit(&block)
  @pool.post(&block)
end