Class: Fluent::Plugin::PgStatStatementsInput
- Inherits:
-
PollingPostgresInputPlugin
- Object
- Input
- PollingPostgresInputPlugin
- Fluent::Plugin::PgStatStatementsInput
- Defined in:
- lib/fluent/plugin/in_pg_stat_statements.rb
Overview
PgStatStatementsInput will periodically poll postgres, querying pg_stat_statements for queryid to query mappings. These are then normalized for security purposes fingerprinted and emitted as records with the following format:
'fingerprint' => '8a6e9896bd9048a2',
'query' => 'SELECT * FROM table ORDER BY queryid LIMIT $1',
'query_length' => 58,
'queryid' => '3239318621761098074'
Instance Method Summary collapse
-
#emit_statements_to_stream(conn) ⇒ Object
Query the database and emit statements to fluentd router.
-
#record_for_row(row) ⇒ Object
Returns a fluentd record for a query row.
Methods inherited from PollingPostgresInputPlugin
#shutdown, #start, #thread_main
Instance Method Details
#emit_statements_to_stream(conn) ⇒ Object
Query the database and emit statements to fluentd router
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 55 def emit_statements_to_stream(conn) me = Fluent::MultiEventStream.new now = Fluent::Engine.now conn.exec('SELECT queryid, query FROM public.pg_stat_statements').each do |row| record = record_for_row(row) me.add(now, record) end @router.emit_stream(@tag, me) end |
#record_for_row(row) ⇒ Object
Returns a fluentd record for a query row
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 33 def record_for_row(row) query = row['query'] # We record the query_length as it will help in understanding whether unparseable # queries are truncated. record = { 'queryid' => row['queryid'].to_s, 'query_length' => query&.length } return record unless query normalized = PgQuery.normalize(query) record['query'] = normalized record[@fingerprint_key] = PgQuery.parse(normalized).fingerprint if @fingerprint_key record rescue PgQuery::ParseError record['query_unparseable'] = true record end |