Class: Webhookdb::ConnectionCache
- Inherits:
-
Object
- Object
- Webhookdb::ConnectionCache
- Extended by:
- MethodUtilities
- Includes:
- Appydays::Configurable, Appydays::Loggable, Dbutil
- Defined in:
- lib/webhookdb/connection_cache.rb
Overview
Keep a dynamic cache of open database connections. Very similar in behavior to Sequel::DATABASES, but we need to prune connections that have not been inactive for a while.
When ‘borrow` is called, either a new connection is made, or an existing one used, for that URL. The connection is yield to the block.
Then, after the block is called, if ‘prune_interval’ has elapsed since the last prune, prune all connections with 0 current connections, _other than the connection just used_. Because this connection was just used, we assume it will be used again soon.
The idea here is that:
-
We cannot connect to the DB statically; each org can have its own DB, so storing it statically would increase DB connections to the the number of orgs in the database.
-
So we replace the organization/synchronization done in Sequel::DATABASES with ConnectionCache.
-
Any number of worker threads need access to the same DB; rather than connecting inline, which is very slow, all DB connections for an org (or across orgs if not in database isolation) can share connections via ConnectionCache.
-
In single-org/db environments, the active organization will always always be the same, so the connection is never returned.
-
In multi-org/db environments, busy orgs will likely stay busy. But a reconnect isn’t the end of the world.
-
It seems more efficient to be pessimistic about future use, and prune anything with 0 connections, rather than optimistic, and use an LRU or something similar, since the connections are somewhat expensive resources to keep open for now reason. That said, we could switch this out for an LRU it the pessimistic pruning results in many reconnections. It would also be reasonable to increase the prune interval to avoid disconnecting as frequently.
Note that, due to certain implementation details, such as setting timeouts and automatic transaction handling, we implement our own threaded connection pooling, so use the SingleThreadedConnectionPool in Sequel and manage multiple threads on our own.
Defined Under Namespace
Classes: ReentranceError
Constant Summary
Constants included from Dbutil
Instance Attribute Summary collapse
-
#dbs_for_urls ⇒ Object
Returns the value of attribute dbs_for_urls.
-
#last_pruned_at ⇒ Object
Returns the value of attribute last_pruned_at.
-
#prune_interval ⇒ Object
Returns the value of attribute prune_interval.
Class Method Summary collapse
Instance Method Summary collapse
-
#borrow(url, transaction: false, timeout: nil, &block) ⇒ Object
Connect to the database at the given URL.
-
#disconnect(url) ⇒ Object
Disconnect the cached connection for the given url, if any.
- #force_disconnect_all ⇒ Object
-
#initialize(prune_interval:) ⇒ ConnectionCache
constructor
A new instance of ConnectionCache.
- #next_prune_at ⇒ Object
- #summarize ⇒ Object
Methods included from MethodUtilities
attr_predicate, attr_predicate_accessor, singleton_attr_accessor, singleton_attr_reader, singleton_attr_writer, singleton_method_alias, singleton_predicate_accessor, singleton_predicate_reader
Methods included from Dbutil
borrow_conn, configured_connection_options, conn_opts, displaysafe_url, reduce_expr, take_conn
Constructor Details
#initialize(prune_interval:) ⇒ ConnectionCache
Returns a new instance of ConnectionCache.
79 80 81 82 83 84 |
# File 'lib/webhookdb/connection_cache.rb', line 79 def initialize(prune_interval:) @mutex = Mutex.new @dbs_for_urls = {} @prune_interval = prune_interval @last_pruned_at = Time.now end |
Instance Attribute Details
#dbs_for_urls ⇒ Object
Returns the value of attribute dbs_for_urls.
77 78 79 |
# File 'lib/webhookdb/connection_cache.rb', line 77 def dbs_for_urls @dbs_for_urls end |
#last_pruned_at ⇒ Object
Returns the value of attribute last_pruned_at.
77 78 79 |
# File 'lib/webhookdb/connection_cache.rb', line 77 def last_pruned_at @last_pruned_at end |
#prune_interval ⇒ Object
Returns the value of attribute prune_interval.
77 78 79 |
# File 'lib/webhookdb/connection_cache.rb', line 77 def prune_interval @prune_interval end |
Class Method Details
.borrow(url, **kw) ⇒ Object
65 66 67 |
# File 'lib/webhookdb/connection_cache.rb', line 65 def self.borrow(url, **kw, &) return self._instance.borrow(url, **kw, &) end |
.disconnect(url) ⇒ Object
69 70 71 |
# File 'lib/webhookdb/connection_cache.rb', line 69 def self.disconnect(url) self._instance.disconnect(url) end |
.force_disconnect_all ⇒ Object
73 74 75 |
# File 'lib/webhookdb/connection_cache.rb', line 73 def self.force_disconnect_all self._instance.force_disconnect_all end |
Instance Method Details
#borrow(url, transaction: false, timeout: nil, &block) ⇒ Object
Connect to the database at the given URL. borrow is not re-entrant, so if the current thread already owns a connection to the given url, raise a ReentrantError. If the url has a DB not in use by any thread, yield that. If the url has no DBs opened, or all are checked out, create and yield a new connection. See class docs for more details.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/webhookdb/connection_cache.rb', line 94 def borrow(url, transaction: false, timeout: nil, &block) raise LocalJumpError if block.nil? raise ArgumentError, "url cannot be blank" if url.blank? now = Time.now if timeout.is_a?(Symbol) timeout_name = "timeout_#{timeout}" begin timeout = Webhookdb::ConnectionCache.send(timeout_name) rescue NoMethodError raise NoMethodError, "no timeout accessor :#{timeout_name}" end end t = Thread.current conn = nil @mutex.synchronize do db_loans = @dbs_for_urls[url] ||= {loaned: {}, available: []} if db_loans[:loaned].key?(t) raise ReentranceError, "ConnectionCache#borrow is not re-entrant for the same database since the connection has stateful config" end conn = db_loans[:available].pop || take_conn(url, single_threaded: true, extensions: [:pg_json, :pg_streaming]) db_loans[:loaned][t] = conn end conn << "SET statement_timeout TO #{timeout * 1000}" if timeout.present? conn << "BEGIN;" if transaction begin result = yield conn conn << "COMMIT;" if transaction rescue Sequel::DatabaseError conn << "ROLLBACK;" if transaction raise ensure conn << "SET statement_timeout TO 0" if timeout.present? @mutex.synchronize do @dbs_for_urls[url][:loaned].delete(t) @dbs_for_urls[url][:available] << conn end end self.prune(url) if now > self.next_prune_at return result end |
#disconnect(url) ⇒ Object
Disconnect the cached connection for the given url, if any. In general, this is only needed when tearing down a database.
140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/webhookdb/connection_cache.rb', line 140 def disconnect(url) raise ArgumentError, "url cannot be blank" if url.blank? db_loans = @dbs_for_urls[url] return if db_loans.nil? if db_loans[:loaned].size.positive? raise Webhookdb::InvalidPrecondition, "url #{displaysafe_url(url)} still has #{db_loans[:loaned].size} active connections" end db_loans[:available].each(&:disconnect) @dbs_for_urls.delete(url) end |
#force_disconnect_all ⇒ Object
160 161 162 163 164 165 166 |
# File 'lib/webhookdb/connection_cache.rb', line 160 def force_disconnect_all @dbs_for_urls.each_value do |db_loans| db_loans[:available].each(&:disconnect) db_loans[:loaned].each_value(&:disconnect) end @dbs_for_urls.clear end |
#next_prune_at ⇒ Object
136 |
# File 'lib/webhookdb/connection_cache.rb', line 136 def next_prune_at = self.last_pruned_at + self.prune_interval |
#summarize ⇒ Object
168 169 170 171 172 |
# File 'lib/webhookdb/connection_cache.rb', line 168 def summarize return self.dbs_for_urls.transform_values do |loans| {loaned: loans[:loaned].size, available: loans[:available].size} end end |