Class: Fluent::Plugin::PgStatActivityInput

Inherits:
PollingPostgresInputPlugin show all
Includes:
MarginaliaExtractor
Defined in:
lib/fluent/plugin/in_pg_stat_activity.rb

Overview

PgStatActivityInput polls the ‘pg_stat_activity` table emitting normalized versions of the queries currently running on the postgres server. Fingerprints of the queries are also included for easier aggregation

Constant Summary collapse

ACTIVITY_QUERY =
<<-SQL
  SELECT
    datid,
    datname,
    pid,
    usesysid,
    usename,
    application_name,
    host(client_addr) as client_addr,
    client_hostname,
    client_port,
    wait_event_type,
    wait_event,
    xact_start,
    CAST(extract(epoch from clock_timestamp() - xact_start) AS double precision) xact_age_s,
    query_start,
    CAST(extract(epoch from clock_timestamp() - query_start) AS double precision) query_age_s,
    state_change,
    CAST(extract(epoch from clock_timestamp() - state_change) AS double precision) state_age_s,
    state,
    query
  FROM pg_catalog.pg_stat_activity
  WHERE usename IS NOT NULL
SQL

Constants included from MarginaliaExtractor

MarginaliaExtractor::MARGINALIA_APPENDED_REGEXP, MarginaliaExtractor::MARGINALIA_PREPENDED_REGEXP

Instance Method Summary collapse

Methods included from MarginaliaExtractor

#extract_entries, #match_marginalia_comment, #parse_entries, #parse_marginalia_into_record, #scrub_comment, #store_key

Methods inherited from PollingPostgresInputPlugin

#shutdown, #start, #thread_main

Instance Method Details

#emit_activity_to_stream(conn) ⇒ Object

Query the database and emit statements to fluentd router



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 56

def emit_activity_to_stream(conn)
  me = Fluent::MultiEventStream.new

  now = Fluent::Engine.now
  conn.exec(ACTIVITY_QUERY).each do |row|
    record = record_for_row(row)
    me.add(now, record)
  end

  @router.emit_stream(@tag, me)
end

#fingerprint_query(query) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 103

def fingerprint_query(query)
  # We record the query_length as it will help in understanding whether unparseable
  # queries are truncated.
  record = { 'query_length' => query&.length, 'query' => nil }

  return record unless query

  normalized = PgQuery.normalize(query)
  record['query'] = normalized

  record[@fingerprint_key] = PgQuery.parse(normalized).fingerprint if @fingerprint_key

  record
rescue PgQuery::ParseError
  record['query_unparseable'] = true

  record
end

#record_for_row(row) ⇒ Object

Returns a fluentd record for a query row



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 69

def record_for_row(row)
  record = {
    'datid' => row['datid'],
    'datname' => row['datname'],
    'pid' => row['pid'],
    'usesysid' => row['usesysid'],
    'usename' => row['usename'],
    'application_name' => row['application_name'],
    'client_addr' => row['client_addr'],
    'client_hostname' => row['client_hostname'],
    'client_port' => row['client_port'],
    'wait_event_type' => row['wait_event_type'],
    'wait_event' => row['wait_event'],
    'xact_start' => row['xact_start']&.iso8601(3),
    'xact_age_s' => row['xact_age_s'],
    'query_start' => row['query_start']&.iso8601(3),
    'query_age_s' => row['query_age_s'],
    'state_change' => row['state_change']&.iso8601(3),
    'state_age_s' => row['state_age_s'],
    'state' => row['state'],
    'query' => row['query'] # This will be stripped, normalized etc
  }

  # Inject marginalia into record
  parse_marginalia_into_record(record, 'query', true)

  # Normalize query and fingerprint
  # Note that `record['query']` was updated in previous step
  # To strip off marginalia comments
  record.merge!(fingerprint_query(record['query']))

  record
end