Class: NatsWork::ConnectionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/natswork/connection_pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ConnectionPool

Returns a new instance of ConnectionPool.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/natswork/connection_pool.rb', line 16

def initialize(options = {})
  @size = options[:size] || 5
  @timeout = options[:timeout] || 5
  @connection_options = options[:connection_options] || {}

  @available = Queue.new
  @connections = []
  @active = Concurrent::Array.new
  @shutdown = false
  @mutex = Mutex.new
  @created_count = 0
  @waiting_count = 0
  @shutdown_condition = ConditionVariable.new
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



14
15
16
# File 'lib/natswork/connection_pool.rb', line 14

def size
  @size
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



14
15
16
# File 'lib/natswork/connection_pool.rb', line 14

def timeout
  @timeout
end

Instance Method Details

#active_connectionsObject



66
67
68
# File 'lib/natswork/connection_pool.rb', line 66

def active_connections
  @active.size
end

#available_connectionsObject



62
63
64
# File 'lib/natswork/connection_pool.rb', line 62

def available_connections
  @available.size
end

#healthy?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/natswork/connection_pool.rb', line 80

def healthy?
  !@shutdown
end

#shutdown(timeout: 30) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/natswork/connection_pool.rb', line 42

def shutdown(timeout: 30)
  @mutex.synchronize do
    @shutdown = true

    # Wait for active connections to return
    deadline = Time.now + timeout
    @shutdown_condition.wait(@mutex, 0.1) while @active.size.positive? && Time.now < deadline

    # Disconnect all connections
    @connections.each do |conn|
      conn.disconnect
    rescue StandardError
      nil
    end

    @connections.clear
    @available.clear
  end
end

#statsObject



70
71
72
73
74
75
76
77
78
# File 'lib/natswork/connection_pool.rb', line 70

def stats
  {
    size: @size,
    available: available_connections,
    active: active_connections,
    waiting: @waiting_count,
    created: @created_count
  }
end

#with_connection(&block) ⇒ Object

Raises:



31
32
33
34
35
36
37
38
39
40
# File 'lib/natswork/connection_pool.rb', line 31

def with_connection(&block)
  raise PoolShutdownError, 'Pool has been shutdown' if @shutdown

  conn = checkout
  begin
    conn.with_connection(&block)
  ensure
    checkin(conn)
  end
end