Class: Pgtk::Stash

Inherits:
Object
  • Object
show all
Defined in:
lib/pgtk/stash.rb

Overview

Database query cache implementation.

Provides a caching layer for PostgreSQL queries, automatically invalidating the cache when tables are modified. Read queries are cached while write queries bypass the cache and invalidate related cached entries.

Thread-safe with read-write locking.

The implementation is very naive! Use it at your own risk.

Author

Yegor Bugayenko ([email protected])

Copyright

Copyright © 2019-2025 Yegor Bugayenko

License

MIT

Examples:

Basic usage

pool = Pgtk::Pool.new(...)
stash = Pgtk::Stash.new(pool, cap: 1000, refill_interval: 30)
stash.start!
result = stash.exec('SELECT * FROM users WHERE id = $1', [42])

Instance Method Summary collapse

Constructor Details

#initialize(pool, stash: { queries: {}, tables: {} }, loog: Loog::NULL, entrance: Concurrent::ReentrantReadWriteLock.new, refill_interval: 16, refill_delay: 0, max_queue_length: 128, threads: 4, cap: 10_000, cap_interval: 60, retire: 15 * 60, retire_interval: 60) ⇒ Stash

Initialize a new Stash with query caching.

Set any of the intervals to nil to disable the cron.

Parameters:

  • pool (Object)

    The underlying connection pool that executes actual database queries

  • stash (Hash) (defaults to: { queries: {}, tables: {} })

    Internal cache structure containing queries and tables hashes for sharing state across transactions

  • refill_interval (Float) (defaults to: 16)

    Interval in seconds between background tasks that recalculate stale cached queries

  • refill_delay (Float) (defaults to: 0)

    A pause in seconds we take before making a refill

  • max_queue_length (Integer) (defaults to: 128)

    Maximum number of refilling tasks allowed in the thread pool queue before new tasks are skipped

  • threads (Integer) (defaults to: 4)

    Number of worker threads in the background thread pool for cache refilling operations

  • cap (Integer) (defaults to: 10_000)

    Maximum number of cached query results to retain; oldest queries are evicted when this limit is exceeded

  • cap_interval (Float) (defaults to: 60)

    Interval in seconds between background tasks that enforce the cache size cap by removing old queries

  • retire (Integer) (defaults to: 15 * 60)

    Maximum age in seconds to keep a query in cache after its latest usage

  • retire_interval (Float) (defaults to: 60)

    Interval in seconds between background tasks that remove retired queries

  • loog (Loog) (defaults to: Loog::NULL)

    Logger instance for debugging and monitoring cache operations (default: null logger)

  • entrance (Concurrent::ReentrantReadWriteLock) (defaults to: Concurrent::ReentrantReadWriteLock.new)

    Read-write lock for thread-safe cache access shared across instances



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/pgtk/stash.rb', line 66

def initialize(
  pool,
  stash: { queries: {}, tables: {} },
  loog: Loog::NULL,
  entrance: Concurrent::ReentrantReadWriteLock.new,
  refill_interval: 16,
  refill_delay: 0,
  max_queue_length: 128,
  threads: 4,
  cap: 10_000,
  cap_interval: 60,
  retire: 15 * 60,
  retire_interval: 60
)
  @pool = pool
  @stash = stash
  @loog = loog
  @entrance = entrance
  @refill_interval = refill_interval
  @refill_delay = refill_delay
  @max_queue_length = max_queue_length
  @threads = threads
  @cap = cap
  @cap_interval = cap_interval
  @retire = retire
  @retire_interval = retire_interval
end

Instance Method Details

#dumpString

Convert internal state into text.

Generates a detailed report of the cache state including query counts, popularity scores, stale queries, and thread pool status.

Returns:

  • (String)

    Multi-line text representation of the current cache state



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/pgtk/stash.rb', line 118

def dump
  qq =
    @stash[:queries].map do |q, kk|
      {
        q: q.dup, # the query
        c: kk.values.count, # how many keys?
        p: kk.values.sum { |vv| vv[:popularity] }, # total popularity of all keys
        s: kk.values.count { |vv| vv[:stale] }, # how many stale keys?
        u: kk.values.map { |vv| vv[:used] }.max || Time.now # when was it used
      }
    end
  [
    @pool.dump,
    '',
    [
      'Pgtk::Stash (',
      [
        "threads=#{@threads}",
        "max_queue_length=#{@max_queue_length}",
        if @refill_interval
          [
            "refill_interval=#{@refill_interval}s",
            "refill_delay=#{@refill_delay}s"
          ]
        else
          'no refilling'
        end,
        if @cap_interval
          [
            "cap_interval=#{@cap_interval}s",
            "cap=#{@cap}"
          ]
        else
          'no capping'
        end,
        if @retire_interval
          [
            "retire_interval=#{@retire_interval}s",
            "retire=#{@retire}"
          ]
        else
          'no retirement'
        end
      ].flatten.join(', '),
      '):'
    ].join,
    if @tpool
      "  #{@tpool.queue_length} tasks in the thread pool"
    else
      '  Not launched yet'
    end,
    "  #{stash_size} queries cached (#{stash_size > @cap ? 'above' : 'below'} the cap)",
    "  #{@stash[:tables].count} tables in cache",
    "  #{qq.sum { |a| a[:s] }} stale queries in cache:",
    qq.select { |a| a[:s].positive? }.sort_by { -_1[:p] }.take(8).map do |a|
      "    #{a[:c]}/#{a[:p]}p/#{a[:s]}s/#{a[:u].ago}: #{a[:q]}"
    end,
    "  #{qq.count { |a| a[:s].zero? }} other queries in cache:",
    qq.select { |a| a[:s].zero? }.sort_by { -_1[:p] }.take(16).map do |a|
      "    #{a[:c]}/#{a[:p]}p/#{a[:s]}s/#{a[:u].ago}: #{a[:q]}"
    end
  ].join("\n")
end

#exec(query, params = [], result = 0) ⇒ PG::Result

Execute a SQL query with optional caching.

Read queries are cached, while write queries bypass the cache and invalidate related entries. Queries containing modification keywords (INSERT, UPDATE, DELETE, etc.) are executed directly and trigger invalidation of cached queries for affected tables. Read queries (SELECT) are cached by query text and parameter values. Queries containing NOW() are never cached.

Parameters:

  • query (String, Array<String>)

    The SQL query to execute as a string or array of strings to be joined

  • params (Array) (defaults to: [])

    Query parameters for placeholder substitution in prepared statements (default: empty array)

  • result (Integer) (defaults to: 0)

    Result format code where 0 requests text format and 1 requests binary format (default: 0)

Returns:

  • (PG::Result)

    Query result object containing rows and metadata from the database



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/pgtk/stash.rb', line 193

def exec(query, params = [], result = 0)
  pure = (query.is_a?(Array) ? query.join(' ') : query).gsub(/\s+/, ' ').strip
  if MODS_RE.match?(pure) || /(^|\s)pg_[a-z_]+\(/.match?(pure)
    tables = pure.scan(ALTS_RE).map(&:first).uniq
    ret = @pool.exec(pure, params, result)
    @entrance.with_write_lock do
      tables.each do |t|
        @stash[:tables][t]&.each do |q|
          @stash[:queries][q]&.each_key do |key|
            @stash[:queries][q][key][:stale] = Time.now
          end
        end
      end
    end
  else
    key = params.map(&:to_s).join(SEPARATOR)
    ret = @stash.dig(:queries, pure, key, :ret)
    if ret.nil? || @stash.dig(:queries, pure, key, :stale)
      ret = @pool.exec(pure, params, result)
      unless pure.include?(' NOW() ')
        tables = pure.scan(/(?<=^|\s)(?:FROM|JOIN) ([a-z_]+)(?=\s|$)/).map(&:first).uniq
        raise "No tables at #{pure.inspect}" if tables.empty?
        @entrance.with_write_lock do
          tables.each do |t|
            @stash[:tables][t] = [] if @stash[:tables][t].nil?
            @stash[:tables][t].append(pure).uniq!
          end
          @stash[:queries][pure] ||= {}
          @stash[:queries][pure][key] = { ret:, params:, result:, used: Time.now }
        end
      end
    end
    if @stash.dig(:queries, pure, key)
      @entrance.with_write_lock do
        @stash[:queries][pure][key][:popularity] ||= 0
        @stash[:queries][pure][key][:popularity] += 1
        @stash[:queries][pure][key][:used] = Time.now
      end
    end
  end
  ret
end

#start!void

This method returns an undefined value.

Start the connection pool and launch background cache management tasks.

Initializes background timer tasks for cache refilling and size capping. The refill task periodically updates stale cached queries based on popularity. The cap task removes oldest queries when cache size exceeds the configured limit.



101
102
103
104
# File 'lib/pgtk/stash.rb', line 101

def start!
  @pool.start!
  launch!
end

#transaction {|Pgtk::Stash| ... } ⇒ Object

Execute a database transaction.

Yields a new Stash that shares the same cache but uses the transaction connection.

Yields:

  • (Pgtk::Stash)

    A stash connected to the transaction

Returns:

  • (Object)

    The result of the block



242
243
244
245
246
247
248
249
250
251
# File 'lib/pgtk/stash.rb', line 242

def transaction
  @pool.transaction do |t|
    yield Pgtk::Stash.new(
      t,
      stash: @stash,
      loog: @loog,
      entrance: @entrance
    )
  end
end

#versionString

Get the PostgreSQL server version.

Returns:

  • (String)

    Version string of the database server



108
109
110
# File 'lib/pgtk/stash.rb', line 108

def version
  @pool.version
end