Class: Fluent::Plugin::PgStatStatementsInput

Inherits:
PollingPostgresInputPlugin show all
Includes:
QueryNormalizer
Defined in:
lib/fluent/plugin/in_pg_stat_statements.rb

Overview

PgStatStatementsInput will periodically poll postgres, querying pg_stat_statements for queryid to query mappings. These are then normalized for security purposes fingerprinted and emitted as records with the following format:

'fingerprint' => '8a6e9896bd9048a2',
'query' => 'SELECT * FROM table ORDER BY queryid LIMIT $1',
'query_length' => 58,
'queryid' => '3239318621761098074'

Constant Summary collapse

POSTGRES_SERVER_VERSION_QUERY =
"SELECT current_setting('server_version_num')"
PG12_STAT_STATEMENTS_QUERY =
<<-SQL
  SELECT queryid,
         query,
         calls,
         rows,
         total_time
    FROM public.pg_stat_statements
SQL
PG13_STAT_STATEMENTS_QUERY =
<<-SQL
  SELECT queryid,
         query,
         calls,
         rows,
         (total_plan_time + total_exec_time) total_time
    FROM public.pg_stat_statements
SQL

Instance Method Summary collapse

Methods included from QueryNormalizer

#normalize_and_fingerprint_query

Methods inherited from PollingPostgresInputPlugin

#shutdown, #start, #thread_main

Constructor Details

#initializePgStatStatementsInput



57
58
59
60
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 57

def initialize
  super
  @postgres_server_version_num = nil
end

Instance Method Details

#emit_statements_to_stream(conn) ⇒ Object

Query the database and emit statements to fluentd router



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 83

def emit_statements_to_stream(conn)
  me = Fluent::MultiEventStream.new

  now = Fluent::Engine.now

  query = query_for_postgres_version(conn)
  records = conn.exec(query).to_a

  records.each do |row|
    record = record_for_row(row)
    me.add(now, record)
  end

  @router.emit_stream(@tag, me)
end

#postgres_server_version_num(conn) ⇒ Object

Returns the PG_VERSION_NUM value from the database will memoize the result



101
102
103
104
105
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 101

def postgres_server_version_num(conn)
  return @postgres_server_version_num if @postgres_server_version_num

  @postgres_server_version_num = conn.exec(POSTGRES_SERVER_VERSION_QUERY).getvalue(0,0).to_i
end

#query_for_postgres_version(conn) ⇒ Object

pg_stat_statements columns changed in pg13, so we use different queries depending on the version www.postgresql.org/docs/12/pgstatstatements.html www.postgresql.org/docs/13/pgstatstatements.html



110
111
112
113
114
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 110

def query_for_postgres_version(conn)
  return PG13_STAT_STATEMENTS_QUERY if postgres_server_version_num(conn) >= 13_00_00

  PG12_STAT_STATEMENTS_QUERY
end

#record_for_row(row) ⇒ Object

Returns a fluentd record for a query row



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 63

def record_for_row(row)
  record = {
    'query' => row['query'],
    'queryid' => row['queryid'].to_s,
    'calls' => row['calls']&.to_i,
    'total_time_ms' => row['total_time']&.to_f,
    'rows' => row['rows']&.to_i
  }

  opts = {
    input_key: 'query',
    output_key: 'query',
    fingerprint_key: @fingerprint_key,
    max_length: @max_length
  }

  normalize_and_fingerprint_query(record, opts)
end