Class: Webhookdb::ConnectionCache

Inherits:
Object
  • Object
show all
Extended by:
MethodUtilities
Includes:
Appydays::Configurable, Appydays::Loggable, Dbutil
Defined in:
lib/webhookdb/connection_cache.rb

Overview

Keep a dynamic cache of open database connections. Very similar in behavior to Sequel::DATABASES, but we need to prune connections that have not been inactive for a while.

When ‘borrow` is called, either a new connection is made, or an existing one used, for that URL. The connection is yield to the block.

Then, after the block is called, if ‘prune_interval’ has elapsed since the last prune, prune all connections with 0 current connections, _other than the connection just used_. Because this connection was just used, we assume it will be used again soon.

The idea here is that:

  • We cannot connect to the DB statically; each org can have its own DB, so storing it statically would increase DB connections to the the number of orgs in the database.

  • So we replace the organization/synchronization done in Sequel::DATABASES with ConnectionCache.

  • Any number of worker threads need access to the same DB; rather than connecting inline, which is very slow, all DB connections for an org (or across orgs if not in database isolation) can share connections via ConnectionCache.

  • In single-org/db environments, the active organization will always always be the same, so the connection is never returned.

  • In multi-org/db environments, busy orgs will likely stay busy. But a reconnect isn’t the end of the world.

  • It seems more efficient to be pessimistic about future use, and prune anything with 0 connections, rather than optimistic, and use an LRU or something similar, since the connections are somewhat expensive resources to keep open for now reason. That said, we could switch this out for an LRU it the pessimistic pruning results in many reconnections. It would also be reasonable to increase the prune interval to avoid disconnecting as frequently.

Note that, due to certain implementation details, such as setting timeouts and automatic transaction handling, we implement our own threaded connection pooling, so use the SingleThreadedConnectionPool in Sequel and manage multiple threads on our own.

Defined Under Namespace

Classes: Available, ReentranceError

Constant Summary

Constants included from Dbutil

Dbutil::MOCK_CONN

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from MethodUtilities

attr_predicate, attr_predicate_accessor, singleton_attr_accessor, singleton_attr_reader, singleton_attr_writer, singleton_method_alias, singleton_predicate_accessor, singleton_predicate_reader

Methods included from Dbutil

borrow_conn, configured_connection_options, conn_opts, displaysafe_url, reduce_expr, take_conn

Constructor Details

#initialize(prune_interval:) ⇒ ConnectionCache

Returns a new instance of ConnectionCache.



82
83
84
85
86
87
# File 'lib/webhookdb/connection_cache.rb', line 82

def initialize(prune_interval:)
  @mutex = Mutex.new
  @dbs_for_urls = {}
  @prune_interval = prune_interval
  @last_pruned_at = Time.now
end

Instance Attribute Details

#dbs_for_urlsObject

Returns the value of attribute dbs_for_urls.



80
81
82
# File 'lib/webhookdb/connection_cache.rb', line 80

def dbs_for_urls
  @dbs_for_urls
end

#last_pruned_atObject

Returns the value of attribute last_pruned_at.



80
81
82
# File 'lib/webhookdb/connection_cache.rb', line 80

def last_pruned_at
  @last_pruned_at
end

#prune_intervalObject

Returns the value of attribute prune_interval.



80
81
82
# File 'lib/webhookdb/connection_cache.rb', line 80

def prune_interval
  @prune_interval
end

Class Method Details

.borrow(url, **kw) ⇒ Object



68
69
70
# File 'lib/webhookdb/connection_cache.rb', line 68

def self.borrow(url, **kw, &)
  return self._instance.borrow(url, **kw, &)
end

.disconnect(url) ⇒ Object



72
73
74
# File 'lib/webhookdb/connection_cache.rb', line 72

def self.disconnect(url)
  self._instance.disconnect(url)
end

.force_disconnect_allObject



76
77
78
# File 'lib/webhookdb/connection_cache.rb', line 76

def self.force_disconnect_all
  self._instance.force_disconnect_all
end

Instance Method Details

#borrow(url, transaction: false, timeout: nil, &block) ⇒ Object

Connect to the database at the given URL. borrow is not re-entrant, so if the current thread already owns a connection to the given url, raise a ReentrantError. If the url has a DB not in use by any thread, yield that. If the url has no DBs opened, or all are checked out, create and yield a new connection. See class docs for more details.

Raises:

  • (LocalJumpError)


116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/webhookdb/connection_cache.rb', line 116

def borrow(url, transaction: false, timeout: nil, &block)
  raise LocalJumpError if block.nil?
  raise ArgumentError, "url cannot be blank" if url.blank?
  now = Time.now
  if timeout.is_a?(Symbol)
    timeout_name = "timeout_#{timeout}"
    begin
      timeout = Webhookdb::ConnectionCache.send(timeout_name)
    rescue NoMethodError
      raise NoMethodError, "no timeout accessor :#{timeout_name}"
    end
  end
  t = Thread.current
  conn = nil
  @mutex.synchronize do
    db_loans = @dbs_for_urls[url] ||= {loaned: {}, available: []}
    if db_loans[:loaned].key?(t)
      raise ReentranceError,
            "ConnectionCache#borrow is not re-entrant for the same database since the connection has stateful config"
    end
    if (available = db_loans[:available].pop)
      # If the connection doesn't validate, it won't be in :available at this point, so don't worry about it.
      conn = available.validated_connection
    end
    conn ||= take_conn(url, single_threaded: true, extensions: [:pg_json, :pg_streaming])
    db_loans[:loaned][t] = conn
  end
  trash_conn = false
  begin
    # All database operations need global handling to ensure property pool management.
    conn << "SET statement_timeout TO #{timeout * 1000}" if timeout.present?
    conn << "BEGIN;" if transaction
    begin
      result = yield conn
      conn << "COMMIT;" if transaction
    rescue Sequel::DatabaseError => e
      # Roll back on any database error; but if we're disconnected, don't bother
      # since we know the rollback won't reach the database.
      conn << "ROLLBACK;" if transaction && !e.is_a?(Sequel::DatabaseDisconnectError)
      raise
    end
  rescue Sequel::DatabaseDisconnectError
    # If we're disconnected, trash this connection rather than re-adding it back to the pool.
    trash_conn = true
    raise
  ensure
    reraise = nil
    if timeout.present?
      begin
        # If the timeout fails for whatever reason, assume the connection is toast
        # and don't return it to the pool.
        conn << "SET statement_timeout TO 0"
      rescue Sequel::DatabaseError => e
        reraise = e
        trash_conn = true
      end
    end
    @mutex.synchronize do
      @dbs_for_urls[url][:loaned].delete(t)
      @dbs_for_urls[url][:available] << Available.new(conn, Time.now) unless trash_conn
    end
    raise reraise if reraise
  end
  self.prune(url) if now > self.next_prune_at
  return result
end

#disconnect(url) ⇒ Object

Disconnect the cached connection for the given url, if any. In general, this is only needed when tearing down a database.

Raises:

  • (ArgumentError)


187
188
189
190
191
192
193
194
195
196
197
# File 'lib/webhookdb/connection_cache.rb', line 187

def disconnect(url)
  raise ArgumentError, "url cannot be blank" if url.blank?
  db_loans = @dbs_for_urls[url]
  return if db_loans.nil?
  if db_loans[:loaned].size.positive?
    raise Webhookdb::InvalidPrecondition,
          "url #{displaysafe_url(url)} still has #{db_loans[:loaned].size} active connections"
  end
  db_loans[:available].each(&:disconnect)
  @dbs_for_urls.delete(url)
end

#force_disconnect_allObject



207
208
209
210
211
212
213
# File 'lib/webhookdb/connection_cache.rb', line 207

def force_disconnect_all
  @dbs_for_urls.each_value do |db_loans|
    db_loans[:available].each(&:disconnect)
    db_loans[:loaned].each_value(&:disconnect)
  end
  @dbs_for_urls.clear
end

#next_prune_atObject



183
# File 'lib/webhookdb/connection_cache.rb', line 183

def next_prune_at = self.last_pruned_at + self.prune_interval

#summarizeObject



215
216
217
218
219
# File 'lib/webhookdb/connection_cache.rb', line 215

def summarize
  return self.dbs_for_urls.transform_values do |loans|
    {loaned: loans[:loaned].size, available: loans[:available].size}
  end
end