Class: Fluent::Plugin::PgStatStatementsInput

Inherits:
PollingPostgresInputPlugin show all
Defined in:
lib/fluent/plugin/in_pg_stat_statements.rb

Overview

PgStatStatementsInput will periodically poll postgres, querying pg_stat_statements for queryid to query mappings. These are then normalized for security purposes fingerprinted and emitted as records with the following format:

'fingerprint' => '8a6e9896bd9048a2',
'query' => 'SELECT * FROM table ORDER BY queryid LIMIT $1',
'query_length' => 58,
'queryid' => '3239318621761098074'

Instance Method Summary collapse

Methods inherited from PollingPostgresInputPlugin

#shutdown, #start, #thread_main

Instance Method Details

#emit_statements_to_stream(conn) ⇒ Object

Query the database and emit statements to fluentd router



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 55

def emit_statements_to_stream(conn)
  me = Fluent::MultiEventStream.new

  now = Fluent::Engine.now
  conn.exec('SELECT queryid, query FROM public.pg_stat_statements').each do |row|
    record = record_for_row(row)
    me.add(now, record)
  end

  @router.emit_stream(@tag, me)
end

#record_for_row(row) ⇒ Object

Returns a fluentd record for a query row



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 33

def record_for_row(row)
  query = row['query']

  # We record the query_length as it will help in understanding whether unparseable
  # queries are truncated.
  record = { 'queryid' => row['queryid'].to_s, 'query_length' => query&.length }

  return record unless query

  normalized = PgQuery.normalize(query)
  record['query'] = normalized

  record[@fingerprint_key] = PgQuery.parse(normalized).fingerprint if @fingerprint_key

  record
rescue PgQuery::ParseError
  record['query_unparseable'] = true

  record
end