Class: Weave::ConnectionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/weave.rb

Overview

A pool of SSH connections. Operations over the pool may be performed in serial or in parallel.

Instance Method Summary collapse

Constructor Details

#initialize(host_list = []) ⇒ ConnectionPool

use #execute_with (instead of #execute) to specify the whole list of hosts each time.



63
64
65
66
# File 'lib/weave.rb', line 63

def initialize(host_list = [])
  @hosts = host_list
  @connections = host_list.reduce({}) { |pool, host| pool.merge(host => LazyConnection.new(host)) }
end

Instance Method Details

#disconnect!Object

Disconnect all open connections.



103
# File 'lib/weave.rb', line 103

def disconnect!() @connections.each_value(&:disconnect) end

#execute(options = {}, &block) ⇒ Object

Run a command over the connection pool. The block is evaluated in the context of LazyConnection.

Options Hash (options):

  • :args (Array)

    the arguments to pass through to the block when it runs.

  • :num_threads (Fixnum)

    the number of concurrent threads to use to process this command. Defaults to DEFAULT_THREAD_POOL_SIZE.

  • :serial (Boolean)

    whether to process the command for each connection one at a time.

  • :batch_by (Fixnum)

    if set, group the connections into batches of no more than this value and fully process each batch before starting the next one.



77
78
79
# File 'lib/weave.rb', line 77

def execute(options = {}, &block)
  execute_with(@hosts, options, &block)
end

#execute_with(host_list, options = {}, &block) ⇒ Object

This is the same as #execute, except that host_list overrides the list of connections with which this ConnectionPool was initialized. Any hosts in here that weren't already in the pool will be added.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/weave.rb', line 83

def execute_with(host_list, options = {}, &block)
  host_list.each { |host| @connections[host] ||= LazyConnection.new(host) }
  args = options[:args] || []
  options[:num_threads] ||= DEFAULT_THREAD_POOL_SIZE
  if options[:serial]
    host_list.each { |host| @connections[host].self_eval args, &block }
  elsif options[:batch_by]
    host_list.each_slice(options[:batch_by]) do |batch|
      Weave.with_thread_pool(batch, options[:num_threads]) do |host, mutex|
        @connections[host].self_eval args, mutex, &block
      end
    end
  else
    Weave.with_thread_pool(host_list, options[:num_threads]) do |host, mutex|
      @connections[host].self_eval args, mutex, &block
    end
  end
end