Class: Pgtk::Stash
- Inherits:
-
Object
- Object
- Pgtk::Stash
- Defined in:
- lib/pgtk/stash.rb
Overview
Database query cache implementation.
Provides a caching layer for PostgreSQL queries, automatically invalidating the cache when tables are modified. Read queries are cached while write queries bypass the cache and invalidate related cached entries.
Thread-safe with read-write locking.
The implementation is very naive! Use it at your own risk.
- Author
-
Yegor Bugayenko ([email protected])
- Copyright
-
Copyright © 2019-2025 Yegor Bugayenko
- License
-
MIT
Instance Method Summary collapse
-
#dump ⇒ String
Convert internal state into text.
-
#exec(query, params = [], result = 0) ⇒ PG::Result
Execute a SQL query with optional caching.
-
#initialize(pool, stash: { queries: {}, tables: {} }, refill_interval: 16, refill_delay: 0, max_queue_length: 128, threads: 4, cap: 10_000, cap_interval: 60, retire: 15 * 60, retire_interval: 60, loog: Loog::NULL, entrance: Concurrent::ReentrantReadWriteLock.new, launched: Concurrent::AtomicBoolean.new(false)) ⇒ Stash
constructor
Initialize a new Stash with query caching.
-
#start! ⇒ void
Start the connection pool and launch background cache management tasks.
-
#transaction {|Pgtk::Stash| ... } ⇒ Object
Execute a database transaction.
-
#version ⇒ String
Get the PostgreSQL server version.
Constructor Details
#initialize(pool, stash: { queries: {}, tables: {} }, refill_interval: 16, refill_delay: 0, max_queue_length: 128, threads: 4, cap: 10_000, cap_interval: 60, retire: 15 * 60, retire_interval: 60, loog: Loog::NULL, entrance: Concurrent::ReentrantReadWriteLock.new, launched: Concurrent::AtomicBoolean.new(false)) ⇒ Stash
Initialize a new Stash with query caching.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/pgtk/stash.rb', line 66 def initialize( pool, stash: { queries: {}, tables: {} }, refill_interval: 16, refill_delay: 0, max_queue_length: 128, threads: 4, cap: 10_000, cap_interval: 60, retire: 15 * 60, retire_interval: 60, loog: Loog::NULL, entrance: Concurrent::ReentrantReadWriteLock.new, launched: Concurrent::AtomicBoolean.new(false) ) @pool = pool @stash = stash @launched = launched @entrance = entrance @refill_interval = refill_interval @refill_delay = refill_delay @max_queue_length = max_queue_length @threads = threads @cap = cap @cap_interval = cap_interval @retire = retire @retire_interval = retire_interval @loog = loog @tpool = Concurrent::FixedThreadPool.new(@threads) end |
Instance Method Details
#dump ⇒ String
Convert internal state into text.
Generates a detailed report of the cache state including query counts, popularity scores, stale queries, and thread pool status.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/pgtk/stash.rb', line 121 def dump qq = @stash[:queries].map do |q, kk| { q: q.dup, # the query c: kk.values.count, # how many keys? p: kk.values.sum { |vv| vv[:popularity] }, # total popularity of all keys s: kk.values.count { |vv| vv[:stale] }, # how many stale keys? u: kk.values.map { |vv| vv[:used] }.max || Time.now # when was it used } end [ @pool.dump, '', [ 'Pgtk::Stash (', [ "refill_interval=#{@refill_interval}s", "max_queue_length=#{@max_queue_length}", "threads=#{@threads}", "cap=#{@cap}", "cap_interval=#{@cap_interval}s", "retire=#{@retire}", "retire_interval=#{@retire_interval}s" ].join(', '), '):' ].join, " #{'not ' if @launched.false?}launched", " #{stash_size} queries cached (#{stash_size > @cap ? 'above' : 'below'} the cap)", " #{@tpool.queue_length} tasks in the thread pool", " #{@stash[:tables].count} tables in cache", " #{qq.sum { |a| a[:s] }} stale queries in cache:", qq.select { |a| a[:s].positive? }.sort_by { -_1[:p] }.take(8).map do |a| " #{a[:c]}/#{a[:p]}p/#{a[:s]}s/#{a[:u].ago}: #{a[:q]}" end, " #{qq.count { |a| a[:s].zero? }} other queries in cache:", qq.select { |a| a[:s].zero? }.sort_by { -_1[:p] }.take(16).map do |a| " #{a[:c]}/#{a[:p]}p/#{a[:s]}s/#{a[:u].ago}: #{a[:q]}" end ].join("\n") end |
#exec(query, params = [], result = 0) ⇒ PG::Result
Execute a SQL query with optional caching.
Read queries are cached, while write queries bypass the cache and invalidate related entries. Queries containing modification keywords (INSERT, UPDATE, DELETE, etc.) are executed directly and trigger invalidation of cached queries for affected tables. Read queries (SELECT) are cached by query text and parameter values. Queries containing NOW() are never cached.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/pgtk/stash.rb', line 174 def exec(query, params = [], result = 0) pure = (query.is_a?(Array) ? query.join(' ') : query).gsub(/\s+/, ' ').strip if MODS_RE.match?(pure) || /(^|\s)pg_[a-z_]+\(/.match?(pure) tables = pure.scan(ALTS_RE).map(&:first).uniq ret = @pool.exec(pure, params, result) @entrance.with_write_lock do tables.each do |t| @stash[:tables][t]&.each do |q| @stash[:queries][q]&.each_key do |key| @stash[:queries][q][key][:stale] = Time.now end end end end else key = params.map(&:to_s).join(SEPARATOR) ret = @stash.dig(:queries, pure, key, :ret) if ret.nil? || @stash.dig(:queries, pure, key, :stale) ret = @pool.exec(pure, params, result) unless pure.include?(' NOW() ') tables = pure.scan(/(?<=^|\s)(?:FROM|JOIN) ([a-z_]+)(?=\s|$)/).map(&:first).uniq raise "No tables at #{pure.inspect}" if tables.empty? @entrance.with_write_lock do tables.each do |t| @stash[:tables][t] = [] if @stash[:tables][t].nil? @stash[:tables][t].append(pure).uniq! end @stash[:queries][pure] ||= {} @stash[:queries][pure][key] = { ret:, params:, result:, used: Time.now } end end end if @stash.dig(:queries, pure, key) @entrance.with_write_lock do @stash[:queries][pure][key][:popularity] ||= 0 @stash[:queries][pure][key][:popularity] += 1 @stash[:queries][pure][key][:used] = Time.now end end end ret end |
#start! ⇒ void
This method returns an undefined value.
Start the connection pool and launch background cache management tasks.
Initializes background timer tasks for cache refilling and size capping. The refill task periodically updates stale cached queries based on popularity. The cap task removes oldest queries when cache size exceeds the configured limit.
104 105 106 107 |
# File 'lib/pgtk/stash.rb', line 104 def start! launch! @pool.start! end |
#transaction {|Pgtk::Stash| ... } ⇒ Object
Execute a database transaction.
Yields a new Stash that shares the same cache but uses the transaction connection.
223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/pgtk/stash.rb', line 223 def transaction @pool.transaction do |t| yield Pgtk::Stash.new( t, stash: @stash, refill_interval: @refill_interval, max_queue_length: @max_queue_length, threads: @threads, loog: @loog, entrance: @entrance, launched: @launched ) end end |
#version ⇒ String
Get the PostgreSQL server version.
111 112 113 |
# File 'lib/pgtk/stash.rb', line 111 def version @pool.version end |