Class: Fluent::Plugin::PgStatStatementsInput
- Inherits:
-
PollingPostgresInputPlugin
- Object
- Input
- PollingPostgresInputPlugin
- Fluent::Plugin::PgStatStatementsInput
- Includes:
- QueryNormalizer
- Defined in:
- lib/fluent/plugin/in_pg_stat_statements.rb
Overview
PgStatStatementsInput will periodically poll postgres, querying pg_stat_statements for queryid to query mappings. These are then normalized for security purposes fingerprinted and emitted as records with the following format:
'fingerprint' => '8a6e9896bd9048a2',
'query' => 'SELECT * FROM table ORDER BY queryid LIMIT $1',
'query_length' => 58,
'queryid' => '3239318621761098074'
Constant Summary collapse
- POSTGRES_SERVER_VERSION_QUERY =
"SELECT current_setting('server_version_num')"- PG12_STAT_STATEMENTS_QUERY =
<<-SQL SELECT queryid, query, calls, rows, total_time FROM public.pg_stat_statements SQL
- PG13_STAT_STATEMENTS_QUERY =
<<-SQL SELECT queryid, query, calls, rows, (total_plan_time + total_exec_time) total_time FROM public.pg_stat_statements SQL
Instance Method Summary collapse
-
#emit_statements_to_stream(conn) ⇒ Object
Query the database and emit statements to fluentd router.
-
#initialize ⇒ PgStatStatementsInput
constructor
A new instance of PgStatStatementsInput.
-
#postgres_server_version_num(conn) ⇒ Object
Returns the PG_VERSION_NUM value from the database will memoize the result.
-
#query_for_postgres_version(conn) ⇒ Object
pg_stat_statements columns changed in pg13, so we use different queries depending on the version www.postgresql.org/docs/12/pgstatstatements.html www.postgresql.org/docs/13/pgstatstatements.html.
-
#record_for_row(row) ⇒ Object
Returns a fluentd record for a query row.
Methods included from QueryNormalizer
#normalize_and_fingerprint_query
Methods inherited from PollingPostgresInputPlugin
#shutdown, #start, #thread_main
Constructor Details
#initialize ⇒ PgStatStatementsInput
57 58 59 60 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 57 def initialize super @postgres_server_version_num = nil end |
Instance Method Details
#emit_statements_to_stream(conn) ⇒ Object
Query the database and emit statements to fluentd router
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 83 def emit_statements_to_stream(conn) me = Fluent::MultiEventStream.new now = Fluent::Engine.now query = query_for_postgres_version(conn) records = conn.exec(query).to_a records.each do |row| record = record_for_row(row) me.add(now, record) end @router.emit_stream(@tag, me) end |
#postgres_server_version_num(conn) ⇒ Object
Returns the PG_VERSION_NUM value from the database will memoize the result
101 102 103 104 105 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 101 def postgres_server_version_num(conn) return @postgres_server_version_num if @postgres_server_version_num @postgres_server_version_num = conn.exec(POSTGRES_SERVER_VERSION_QUERY).getvalue(0,0).to_i end |
#query_for_postgres_version(conn) ⇒ Object
pg_stat_statements columns changed in pg13, so we use different queries depending on the version www.postgresql.org/docs/12/pgstatstatements.html www.postgresql.org/docs/13/pgstatstatements.html
110 111 112 113 114 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 110 def query_for_postgres_version(conn) return PG13_STAT_STATEMENTS_QUERY if postgres_server_version_num(conn) >= 13_00_00 PG12_STAT_STATEMENTS_QUERY end |
#record_for_row(row) ⇒ Object
Returns a fluentd record for a query row
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 63 def record_for_row(row) record = { 'query' => row['query'], 'queryid' => row['queryid'].to_s, 'calls' => row['calls']&.to_i, 'total_time_ms' => row['total_time']&.to_f, 'rows' => row['rows']&.to_i } opts = { input_key: 'query', output_key: 'query', fingerprint_key: @fingerprint_key, max_length: @max_length } normalize_and_fingerprint_query(record, opts) end |