Class: Webhookdb::ConnectionCache

Inherits:
Object
  • Object
show all
Extended by:
MethodUtilities
Includes:
Appydays::Configurable, Appydays::Loggable, Dbutil
Defined in:
lib/webhookdb/connection_cache.rb

Overview

Keep a dynamic cache of open database connections. Very similar in behavior to Sequel::DATABASES, but we need to prune connections that have not been inactive for a while.

When ‘borrow` is called, either a new connection is made, or an existing one used, for that URL. The connection is yield to the block.

Then, after the block is called, if ‘prune_interval’ has elapsed since the last prune, prune all connections with 0 current connections, _other than the connection just used_. Because this connection was just used, we assume it will be used again soon.

The idea here is that:

  • We cannot connect to the DB statically; each org can have its own DB, so storing it statically would increase DB connections to the the number of orgs in the database.

  • So we replace the organization/synchronization done in Sequel::DATABASES with ConnectionCache.

  • Any number of worker threads need access to the same DB; rather than connecting inline, which is very slow, all DB connections for an org (or across orgs if not in database isolation) can share connections via ConnectionCache.

  • In single-org/db environments, the active organization will always always be the same, so the connection is never returned.

  • In multi-org/db environments, busy orgs will likely stay busy. But a reconnect isn’t the end of the world.

  • It seems more efficient to be pessimistic about future use, and prune anything with 0 connections, rather than optimistic, and use an LRU or something similar, since the connections are somewhat expensive resources to keep open for now reason. That said, we could switch this out for an LRU it the pessimistic pruning results in many reconnections. It would also be reasonable to increase the prune interval to avoid disconnecting as frequently.

Note that, due to certain implementation details, such as setting timeouts and automatic transaction handling, we implement our own threaded connection pooling, so use the SingleThreadedConnectionPool in Sequel and manage multiple threads on our own.

Defined Under Namespace

Classes: ReentranceError

Constant Summary

Constants included from Dbutil

Dbutil::MOCK_CONN

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from MethodUtilities

attr_predicate, attr_predicate_accessor, singleton_attr_accessor, singleton_attr_reader, singleton_attr_writer, singleton_method_alias, singleton_predicate_accessor, singleton_predicate_reader

Methods included from Dbutil

borrow_conn, configured_connection_options, conn_opts, displaysafe_url, reduce_expr, take_conn

Constructor Details

#initialize(prune_interval:) ⇒ ConnectionCache

Returns a new instance of ConnectionCache.



79
80
81
82
83
84
# File 'lib/webhookdb/connection_cache.rb', line 79

def initialize(prune_interval:)
  @mutex = Mutex.new
  @dbs_for_urls = {}
  @prune_interval = prune_interval
  @last_pruned_at = Time.now
end

Instance Attribute Details

#dbs_for_urlsObject

Returns the value of attribute dbs_for_urls.



77
78
79
# File 'lib/webhookdb/connection_cache.rb', line 77

def dbs_for_urls
  @dbs_for_urls
end

#last_pruned_atObject

Returns the value of attribute last_pruned_at.



77
78
79
# File 'lib/webhookdb/connection_cache.rb', line 77

def last_pruned_at
  @last_pruned_at
end

#prune_intervalObject

Returns the value of attribute prune_interval.



77
78
79
# File 'lib/webhookdb/connection_cache.rb', line 77

def prune_interval
  @prune_interval
end

Class Method Details

.borrow(url, **kw) ⇒ Object



65
66
67
# File 'lib/webhookdb/connection_cache.rb', line 65

def self.borrow(url, **kw, &)
  return self._instance.borrow(url, **kw, &)
end

.disconnect(url) ⇒ Object



69
70
71
# File 'lib/webhookdb/connection_cache.rb', line 69

def self.disconnect(url)
  self._instance.disconnect(url)
end

.force_disconnect_allObject



73
74
75
# File 'lib/webhookdb/connection_cache.rb', line 73

def self.force_disconnect_all
  self._instance.force_disconnect_all
end

Instance Method Details

#borrow(url, transaction: false, timeout: nil, &block) ⇒ Object

Connect to the database at the given URL. borrow is not re-entrant, so if the current thread already owns a connection to the given url, raise a ReentrantError. If the url has a DB not in use by any thread, yield that. If the url has no DBs opened, or all are checked out, create and yield a new connection. See class docs for more details.

Raises:

  • (LocalJumpError)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/webhookdb/connection_cache.rb', line 94

def borrow(url, transaction: false, timeout: nil, &block)
  raise LocalJumpError if block.nil?
  raise ArgumentError, "url cannot be blank" if url.blank?
  now = Time.now
  if timeout.is_a?(Symbol)
    timeout_name = "timeout_#{timeout}"
    begin
      timeout = Webhookdb::ConnectionCache.send(timeout_name)
    rescue NoMethodError
      raise NoMethodError, "no timeout accessor :#{timeout_name}"
    end
  end
  t = Thread.current
  conn = nil
  @mutex.synchronize do
    db_loans = @dbs_for_urls[url] ||= {loaned: {}, available: []}
    if db_loans[:loaned].key?(t)
      raise ReentranceError,
            "ConnectionCache#borrow is not re-entrant for the same database since the connection has stateful config"
    end
    conn = db_loans[:available].pop || take_conn(url, single_threaded: true, extensions: [:pg_json, :pg_streaming])
    db_loans[:loaned][t] = conn
  end
  conn << "SET statement_timeout TO #{timeout * 1000}" if timeout.present?
  conn << "BEGIN;" if transaction
  begin
    result = yield conn
    conn << "COMMIT;" if transaction
  rescue Sequel::DatabaseError
    conn << "ROLLBACK;" if transaction
    raise
  ensure
    conn << "SET statement_timeout TO 0" if timeout.present?
    @mutex.synchronize do
      @dbs_for_urls[url][:loaned].delete(t)
      @dbs_for_urls[url][:available] << conn
    end
  end
  self.prune(url) if now > self.next_prune_at
  return result
end

#disconnect(url) ⇒ Object

Disconnect the cached connection for the given url, if any. In general, this is only needed when tearing down a database.

Raises:

  • (ArgumentError)


140
141
142
143
144
145
146
147
148
149
150
# File 'lib/webhookdb/connection_cache.rb', line 140

def disconnect(url)
  raise ArgumentError, "url cannot be blank" if url.blank?
  db_loans = @dbs_for_urls[url]
  return if db_loans.nil?
  if db_loans[:loaned].size.positive?
    raise Webhookdb::InvalidPrecondition,
          "url #{displaysafe_url(url)} still has #{db_loans[:loaned].size} active connections"
  end
  db_loans[:available].each(&:disconnect)
  @dbs_for_urls.delete(url)
end

#force_disconnect_allObject



160
161
162
163
164
165
166
# File 'lib/webhookdb/connection_cache.rb', line 160

def force_disconnect_all
  @dbs_for_urls.each_value do |db_loans|
    db_loans[:available].each(&:disconnect)
    db_loans[:loaned].each_value(&:disconnect)
  end
  @dbs_for_urls.clear
end

#next_prune_atObject



136
# File 'lib/webhookdb/connection_cache.rb', line 136

def next_prune_at = self.last_pruned_at + self.prune_interval

#summarizeObject



168
169
170
171
172
# File 'lib/webhookdb/connection_cache.rb', line 168

def summarize
  return self.dbs_for_urls.transform_values do |loans|
    {loaned: loans[:loaned].size, available: loans[:available].size}
  end
end