Class: WaterDrop::ConnectionPool

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/waterdrop/connection_pool.rb

Overview

Connection pool wrapper for WaterDrop producers using the proven connection_pool gem.

This provides a clean WaterDrop-specific API while leveraging the battle-tested, connection_pool gem underneath. The wrapper hides the direct usage of the connection_pool gem and provides WaterDrop-specific configuration.

Examples:

Basic usage

pool = WaterDrop::ConnectionPool.new(size: 10) do |config|
  config.kafka = { 'bootstrap.servers': 'localhost:9092' }
  config.deliver = true
end

pool.with do |producer|
  producer.produce_sync(topic: 'events', payload: 'data')
end

Transactional producers with unique IDs

pool = WaterDrop::ConnectionPool.new(size: 5) do |config, index|
  config.kafka = {
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': "my-app-#{index}"
  }
end

Global connection pool

WaterDrop::ConnectionPool.setup(size: 20) do |config|
  config.kafka = { 'bootstrap.servers': ENV['KAFKA_BROKERS'] }
end

WaterDrop::ConnectionPool.with do |producer|
  producer.produce_async(topic: 'events', payload: 'data')
end

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool

Creates a new WaterDrop connection pool

Yields:

  • (config, index)

    Block to configure each producer in the pool, receives config and pool index



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/waterdrop/connection_pool.rb', line 201

def initialize(size: 5, timeout: 5000, &producer_config)
  self.class.send(:ensure_connection_pool_gem!)

  @producer_config = producer_config
  @pool_index = 0
  @pool_mutex = Mutex.new

  @pool = ::ConnectionPool.new(size: size, timeout: timeout / 1000.0) do
    producer_index = @pool_mutex.synchronize { @pool_index += 1 }

    WaterDrop::Producer.new do |config|
      if @producer_config.arity == 2
        @producer_config.call(config, producer_index)
      else
        @producer_config.call(config)
      end
    end
  end

  # Emit event when a connection pool is created
  WaterDrop.instrumentation.instrument(
    "connection_pool.created",
    pool: self,
    size: size,
    timeout: timeout
  )
end

Class Attribute Details

.default_poolConnectionPool?



47
48
49
# File 'lib/waterdrop/connection_pool.rb', line 47

def default_pool
  @default_pool
end

Instance Attribute Details

#pool::ConnectionPool (readonly)

Returns the underlying connection_pool instance This allows access to advanced connection_pool features if needed



293
294
295
# File 'lib/waterdrop/connection_pool.rb', line 293

def pool
  @pool
end

Class Method Details

.active?Boolean

Check if the global connection pool is active (configured)



151
152
153
# File 'lib/waterdrop/connection_pool.rb', line 151

def active?
  !@default_pool.nil?
end

.closeObject

Shutdown the global connection pool Alias for shutdown to align with producer API WaterDrop producers use #close, so we alias connection pool #shutdown to #close for API consistency across both individual producers and connection pools



133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/waterdrop/connection_pool.rb', line 133

def shutdown
  return unless @default_pool

  pool = @default_pool
  @default_pool.shutdown
  @default_pool = nil

  # Emit global event for pool shutdown
  WaterDrop.instrumentation.instrument(
    "connection_pool.shutdown",
    pool: pool
  )
end

.reloadObject

Reload the global connection pool



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/waterdrop/connection_pool.rb', line 136

def reload
  return unless @default_pool

  @default_pool.reload

  # Emit global event for pool reload
  WaterDrop.instrumentation.instrument(
    "connection_pool.reload",
    pool: @default_pool
  )
end

.setup(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool

Sets up a global connection pool

Examples:

Basic setup

WaterDrop::ConnectionPool.setup(size: 15) do |config|
  config.kafka = { 'bootstrap.servers': ENV['KAFKA_BROKERS'] }
  config.deliver = true
end

Transactional setup with unique IDs

WaterDrop::ConnectionPool.setup(size: 5) do |config, index|
  config.kafka = {
    'bootstrap.servers': ENV['KAFKA_BROKERS'],
    'transactional.id': "my-app-#{index}"
  }
end

Yields:

  • (config, index)

    Block to configure each producer in the pool, receives config and pool index



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/waterdrop/connection_pool.rb', line 71

def setup(size: 5, timeout: 5000, &producer_config)
  ensure_connection_pool_gem!

  @default_pool = new(size: size, timeout: timeout, &producer_config)

  # Emit global event for pool setup
  WaterDrop.instrumentation.instrument(
    "connection_pool.setup",
    pool: @default_pool,
    size: size,
    timeout: timeout
  )

  @default_pool
end

.shutdownObject

Shutdown the global connection pool



116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/waterdrop/connection_pool.rb', line 116

def shutdown
  return unless @default_pool

  pool = @default_pool
  @default_pool.shutdown
  @default_pool = nil

  # Emit global event for pool shutdown
  WaterDrop.instrumentation.instrument(
    "connection_pool.shutdown",
    pool: pool
  )
end

.statsHash?

Get statistics about the global pool



106
107
108
109
110
111
112
113
# File 'lib/waterdrop/connection_pool.rb', line 106

def stats
  return nil unless @default_pool

  {
    size: @default_pool.size,
    available: @default_pool.available
  }
end

.transaction {|producer| ... } ⇒ Object

Execute a transaction with a producer from the global connection pool Only available when connection pool is configured

Examples:

WaterDrop::ConnectionPool.transaction do |producer|
  producer.produce(topic: 'events', payload: 'data1')
  producer.produce(topic: 'events', payload: 'data2')
end

Yields:

  • (producer)

    Producer from the global pool with an active transaction

Raises:

  • (RuntimeError)

    If no global pool is configured



167
168
169
170
171
# File 'lib/waterdrop/connection_pool.rb', line 167

def transaction(...)
  raise "No global connection pool configured. Call setup first." unless @default_pool

  @default_pool.transaction(...)
end

.with {|producer| ... } ⇒ Object

Executes a block with a producer from the global pool

Examples:

WaterDrop::ConnectionPool.with do |producer|
  producer.produce_sync(topic: 'events', payload: 'data')
end

Yields:

  • (producer)

    Producer from the global pool

Raises:

  • (RuntimeError)

    If no global pool is configured



97
98
99
100
101
# File 'lib/waterdrop/connection_pool.rb', line 97

def with(...)
  raise "No global connection pool configured. Call setup first." unless @default_pool

  @default_pool.with(...)
end

Instance Method Details

#reloadObject

Reload all connections in the pool Useful for configuration changes or error recovery



259
260
261
262
263
264
265
266
267
268
269
# File 'lib/waterdrop/connection_pool.rb', line 259

def reload
  @pool.reload do |producer|
    producer.close! if producer&.status&.active?
  end

  # Emit event after pool is reloaded
  WaterDrop.instrumentation.instrument(
    "connection_pool.reloaded",
    pool: self
  )
end

#shutdownObject Also known as: close

Shutdown the connection pool



240
241
242
243
244
245
246
247
248
249
250
# File 'lib/waterdrop/connection_pool.rb', line 240

def shutdown
  @pool.shutdown do |producer|
    producer.close! if producer&.status&.active?
  end

  # Emit event after pool is shut down
  WaterDrop.instrumentation.instrument(
    "connection_pool.shutdown",
    pool: self
  )
end

#statsHash

Get pool statistics



232
233
234
235
236
237
# File 'lib/waterdrop/connection_pool.rb', line 232

def stats
  {
    size: @pool.size,
    available: @pool.available
  }
end

#transaction {|producer| ... } ⇒ Object

Execute a transaction with a producer from this connection pool

Examples:

pool.transaction do |producer|
  producer.produce(topic: 'events', payload: 'data1')
  producer.produce(topic: 'events', payload: 'data2')
end

Yields:

  • (producer)

    Producer from the pool with an active transaction



281
282
283
284
285
286
287
# File 'lib/waterdrop/connection_pool.rb', line 281

def transaction
  with do |producer|
    producer.transaction do
      yield(producer)
    end
  end
end