Class: Pgtk::Stash

Inherits:
Object
  • Object
show all
Defined in:
lib/pgtk/stash.rb

Overview

Database query cache implementation.

Provides a caching layer for PostgreSQL queries, automatically invalidating the cache when tables are modified. Read queries are cached while write queries bypass the cache and invalidate related cached entries.

Thread-safe with read-write locking.

The implementation is very naive! Use it at your own risk.

Author

Yegor Bugayenko ([email protected])

Copyright

Copyright © 2019-2025 Yegor Bugayenko

License

MIT

Examples:

Basic usage

pool = Pgtk::Pool.new(...)
stash = Pgtk::Stash.new(pool, cap: 1000, refill_interval: 30)
stash.start!
result = stash.exec('SELECT * FROM users WHERE id = $1', [42])

Instance Method Summary collapse

Constructor Details

#initialize(pool, stash: { queries: {}, tables: {} }, refill_interval: 16, refill_delay: 0, max_queue_length: 128, threads: 4, cap: 10_000, cap_interval: 60, retire: 15 * 60, retire_interval: 60, loog: Loog::NULL, entrance: Concurrent::ReentrantReadWriteLock.new, launched: Concurrent::AtomicBoolean.new(false)) ⇒ Stash

Initialize a new Stash with query caching.

Parameters:

  • pool (Object)

    The underlying connection pool that executes actual database queries

  • stash (Hash) (defaults to: { queries: {}, tables: {} })

    Internal cache structure containing queries and tables hashes for sharing state across transactions

  • refill_interval (Float) (defaults to: 16)

    Interval in seconds between background tasks that recalculate stale cached queries

  • refill_delay (Float) (defaults to: 0)

    A pause in seconds we take before making a refill

  • max_queue_length (Integer) (defaults to: 128)

    Maximum number of refilling tasks allowed in the thread pool queue before new tasks are skipped

  • threads (Integer) (defaults to: 4)

    Number of worker threads in the background thread pool for cache refilling operations

  • cap (Integer) (defaults to: 10_000)

    Maximum number of cached query results to retain; oldest queries are evicted when this limit is exceeded

  • cap_interval (Float) (defaults to: 60)

    Interval in seconds between background tasks that enforce the cache size cap by removing old queries

  • retire (Integer) (defaults to: 15 * 60)

    Maximum age in seconds to keep a query in cache after its latest usage

  • retire_interval (Float) (defaults to: 60)

    Interval in seconds between background tasks that remove retired queries

  • loog (Loog) (defaults to: Loog::NULL)

    Logger instance for debugging and monitoring cache operations (default: null logger)

  • entrance (Concurrent::ReentrantReadWriteLock) (defaults to: Concurrent::ReentrantReadWriteLock.new)

    Read-write lock for thread-safe cache access shared across instances

  • launched (Concurrent::AtomicBoolean) (defaults to: Concurrent::AtomicBoolean.new(false))

    Atomic boolean flag tracking whether background tasks have been started to prevent multiple launches



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/pgtk/stash.rb', line 66

def initialize(
  pool,
  stash: { queries: {}, tables: {} },
  refill_interval: 16,
  refill_delay: 0,
  max_queue_length: 128,
  threads: 4,
  cap: 10_000,
  cap_interval: 60,
  retire: 15 * 60,
  retire_interval: 60,
  loog: Loog::NULL,
  entrance: Concurrent::ReentrantReadWriteLock.new,
  launched: Concurrent::AtomicBoolean.new(false)
)
  @pool = pool
  @stash = stash
  @launched = launched
  @entrance = entrance
  @refill_interval = refill_interval
  @refill_delay = refill_delay
  @max_queue_length = max_queue_length
  @threads = threads
  @cap = cap
  @cap_interval = cap_interval
  @retire = retire
  @retire_interval = retire_interval
  @loog = loog
  @tpool = Concurrent::FixedThreadPool.new(@threads)
end

Instance Method Details

#dumpString

Convert internal state into text.

Generates a detailed report of the cache state including query counts, popularity scores, stale queries, and thread pool status.

Returns:

  • (String)

    Multi-line text representation of the current cache state



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/pgtk/stash.rb', line 121

def dump
  qq =
    @stash[:queries].map do |q, kk|
      {
        q: q.dup, # the query
        c: kk.values.count, # how many keys?
        p: kk.values.sum { |vv| vv[:popularity] }, # total popularity of all keys
        s: kk.values.count { |vv| vv[:stale] }, # how many stale keys?
        u: kk.values.map { |vv| vv[:used] }.max || Time.now # when was it used
      }
    end
  [
    @pool.dump,
    '',
    [
      'Pgtk::Stash (',
      [
        "refill_interval=#{@refill_interval}s",
        "max_queue_length=#{@max_queue_length}",
        "threads=#{@threads}",
        "cap=#{@cap}",
        "cap_interval=#{@cap_interval}s",
        "retire=#{@retire}",
        "retire_interval=#{@retire_interval}s"
      ].join(', '),
      '):'
    ].join,
    "  #{'not ' if @launched.false?}launched",
    "  #{stash_size} queries cached (#{stash_size > @cap ? 'above' : 'below'} the cap)",
    "  #{@tpool.queue_length} tasks in the thread pool",
    "  #{@stash[:tables].count} tables in cache",
    "  #{qq.sum { |a| a[:s] }} stale queries in cache:",
    qq.select { |a| a[:s].positive? }.sort_by { -_1[:p] }.take(8).map do |a|
      "    #{a[:c]}/#{a[:p]}p/#{a[:s]}s/#{a[:u].ago}: #{a[:q]}"
    end,
    "  #{qq.count { |a| a[:s].zero? }} other queries in cache:",
    qq.select { |a| a[:s].zero? }.sort_by { -_1[:p] }.take(16).map do |a|
      "    #{a[:c]}/#{a[:p]}p/#{a[:s]}s/#{a[:u].ago}: #{a[:q]}"
    end
  ].join("\n")
end

#exec(query, params = [], result = 0) ⇒ PG::Result

Execute a SQL query with optional caching.

Read queries are cached, while write queries bypass the cache and invalidate related entries. Queries containing modification keywords (INSERT, UPDATE, DELETE, etc.) are executed directly and trigger invalidation of cached queries for affected tables. Read queries (SELECT) are cached by query text and parameter values. Queries containing NOW() are never cached.

Parameters:

  • query (String, Array<String>)

    The SQL query to execute as a string or array of strings to be joined

  • params (Array) (defaults to: [])

    Query parameters for placeholder substitution in prepared statements (default: empty array)

  • result (Integer) (defaults to: 0)

    Result format code where 0 requests text format and 1 requests binary format (default: 0)

Returns:

  • (PG::Result)

    Query result object containing rows and metadata from the database



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/pgtk/stash.rb', line 174

def exec(query, params = [], result = 0)
  pure = (query.is_a?(Array) ? query.join(' ') : query).gsub(/\s+/, ' ').strip
  if MODS_RE.match?(pure) || /(^|\s)pg_[a-z_]+\(/.match?(pure)
    tables = pure.scan(ALTS_RE).map(&:first).uniq
    ret = @pool.exec(pure, params, result)
    @entrance.with_write_lock do
      tables.each do |t|
        @stash[:tables][t]&.each do |q|
          @stash[:queries][q]&.each_key do |key|
            @stash[:queries][q][key][:stale] = Time.now
          end
        end
      end
    end
  else
    key = params.map(&:to_s).join(SEPARATOR)
    ret = @stash.dig(:queries, pure, key, :ret)
    if ret.nil? || @stash.dig(:queries, pure, key, :stale)
      ret = @pool.exec(pure, params, result)
      unless pure.include?(' NOW() ')
        tables = pure.scan(/(?<=^|\s)(?:FROM|JOIN) ([a-z_]+)(?=\s|$)/).map(&:first).uniq
        raise "No tables at #{pure.inspect}" if tables.empty?
        @entrance.with_write_lock do
          tables.each do |t|
            @stash[:tables][t] = [] if @stash[:tables][t].nil?
            @stash[:tables][t].append(pure).uniq!
          end
          @stash[:queries][pure] ||= {}
          @stash[:queries][pure][key] = { ret:, params:, result:, used: Time.now }
        end
      end
    end
    if @stash.dig(:queries, pure, key)
      @entrance.with_write_lock do
        @stash[:queries][pure][key][:popularity] ||= 0
        @stash[:queries][pure][key][:popularity] += 1
        @stash[:queries][pure][key][:used] = Time.now
      end
    end
  end
  ret
end

#start!void

This method returns an undefined value.

Start the connection pool and launch background cache management tasks.

Initializes background timer tasks for cache refilling and size capping. The refill task periodically updates stale cached queries based on popularity. The cap task removes oldest queries when cache size exceeds the configured limit.



104
105
106
107
# File 'lib/pgtk/stash.rb', line 104

def start!
  launch!
  @pool.start!
end

#transaction {|Pgtk::Stash| ... } ⇒ Object

Execute a database transaction.

Yields a new Stash that shares the same cache but uses the transaction connection.

Yields:

  • (Pgtk::Stash)

    A stash connected to the transaction

Returns:

  • (Object)

    The result of the block



223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/pgtk/stash.rb', line 223

def transaction
  @pool.transaction do |t|
    yield Pgtk::Stash.new(
      t,
      stash: @stash,
      refill_interval: @refill_interval,
      max_queue_length: @max_queue_length,
      threads: @threads,
      loog: @loog,
      entrance: @entrance,
      launched: @launched
    )
  end
end

#versionString

Get the PostgreSQL server version.

Returns:

  • (String)

    Version string of the database server



111
112
113
# File 'lib/pgtk/stash.rb', line 111

def version
  @pool.version
end