Class: Fluent::Plugin::PgStatActivityInput

Inherits:
PollingPostgresInputPlugin show all
Includes:
MarginaliaExtractor
Defined in:
lib/fluent/plugin/in_pg_stat_activity.rb

Overview

PgStatActivityInput polls the ‘pg_stat_activity` table emitting normalized versions of the queries currently running on the postgres server. Fingerprints of the queries are also included for easier aggregation

Constant Summary collapse

ACTIVITY_QUERY =
"  SELECT\n    datid,\n    datname,\n    pid,\n    usesysid,\n    usename,\n    application_name,\n    host(client_addr) as client_addr,\n    client_hostname,\n    client_port,\n    xact_start,\n    extract(epoch from clock_timestamp() - xact_start) xact_age_s,\n    query_start,\n    extract(epoch from clock_timestamp() - query_start) query_age_s,\n    state_change,\n    extract(epoch from clock_timestamp() - state_change) state_age_s,\n    state,\n    query\n  FROM pg_stat_activity\n  WHERE usename IS NOT NULL\n"

Constants included from MarginaliaExtractor

MarginaliaExtractor::MARGINALIA_APPENDED_REGEXP, MarginaliaExtractor::MARGINALIA_PREPENDED_REGEXP

Instance Method Summary collapse

Methods included from MarginaliaExtractor

#extract_entries, #match_marginalia_comment, #parse_entries, #parse_marginalia_into_record, #scrub_comment, #store_key

Methods inherited from PollingPostgresInputPlugin

#shutdown, #start, #thread_main

Instance Method Details

#emit_activity_to_stream(conn) ⇒ Object

Query the database and emit statements to fluentd router



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 54

def emit_activity_to_stream(conn)
  me = Fluent::MultiEventStream.new

  now = Fluent::Engine.now
  conn.exec(ACTIVITY_QUERY).each do |row|
    record = record_for_row(row)
    me.add(now, record)
  end

  @router.emit_stream(@tag, me)
end

#fingerprint_query(query) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 99

def fingerprint_query(query)
  # We record the query_length as it will help in understanding whether unparseable
  # queries are truncated.
  record = { 'query_length' => query&.length, 'query' => nil }

  return record unless query

  normalized = PgQuery.normalize(query)
  record['query'] = normalized

  record[@fingerprint_key] = PgQuery.parse(normalized).fingerprint if @fingerprint_key

  record
rescue PgQuery::ParseError
  record['query_unparseable'] = true

  record
end

#record_for_row(row) ⇒ Object

Returns a fluentd record for a query row



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 67

def record_for_row(row)
  record = {
    'datid' => row['datid'],
    'datname' => row['datname'],
    'pid' => row['pid'],
    'usesysid' => row['usesysid'],
    'usename' => row['usename'],
    'application_name' => row['application_name'],
    'client_addr' => row['client_addr'],
    'client_hostname' => row['client_hostname'],
    'client_port' => row['client_port'],
    'xact_start' => row['xact_start']&.iso8601(3),
    'xact_age_s' => row['xact_age_s'],
    'query_start' => row['query_start']&.iso8601(3),
    'query_age_s' => row['query_age_s'],
    'state_change' => row['state_change']&.iso8601(3),
    'state_age_s' => row['state_age_s'],
    'state' => row['state'],
    'query' => row['query'] # This will be stripped, normalized etc
  }

  # Inject marginalia into record
  parse_marginalia_into_record(record, 'query', true)

  # Normalize query and fingerprint
  # Note that `record['query']` was updated in previous step
  # To strip off marginalia comments
  record.merge!(fingerprint_query(record['query']))

  record
end