Class: Fluent::Plugin::PgStatActivityInput

Inherits:
PollingPostgresInputPlugin show all
Includes:
MarginaliaExtractor
Defined in:
lib/fluent/plugin/in_pg_stat_activity.rb

Overview

PgStatActivityInput polls the ‘pg_stat_activity` table emitting normalized versions of the queries currently running on the postgres server. Fingerprints of the queries are also included for easier aggregation

Constant Summary collapse

ACTIVITY_QUERY =
"  SELECT\n    datid,\n    datname,\n    pid,\n    usesysid,\n    usename,\n    application_name,\n    host(client_addr) as client_addr,\n    client_hostname,\n    client_port,\n    wait_event_type,\n    wait_event,\n    xact_start,\n    CAST(extract(epoch from clock_timestamp() - xact_start) AS double precision) xact_age_s,\n    query_start,\n    CAST(extract(epoch from clock_timestamp() - query_start) AS double precision) query_age_s,\n    state_change,\n    CAST(extract(epoch from clock_timestamp() - state_change) AS double precision) state_age_s,\n    state,\n    query\n  FROM pg_catalog.pg_stat_activity\n  WHERE usename IS NOT NULL\n"

Constants included from MarginaliaExtractor

MarginaliaExtractor::MARGINALIA_APPENDED_REGEXP, MarginaliaExtractor::MARGINALIA_PREPENDED_REGEXP

Instance Method Summary collapse

Methods included from MarginaliaExtractor

#extract_entries, #match_marginalia_comment, #parse_entries, #parse_marginalia_into_record, #scrub_comment, #store_key

Methods inherited from PollingPostgresInputPlugin

#shutdown, #start, #thread_main

Instance Method Details

#emit_activity_to_stream(conn) ⇒ Object

Query the database and emit statements to fluentd router



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 56

def emit_activity_to_stream(conn)
  me = Fluent::MultiEventStream.new

  now = Fluent::Engine.now
  records = conn.exec(ACTIVITY_QUERY).to_a

  records.each do |row|
    record = record_for_row(row)
    me.add(now, record)
  end

  @router.emit_stream(@tag, me)
end

#fingerprint_query(query) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 105

def fingerprint_query(query)
  # We record the query_length as it will help in understanding whether unparseable
  # queries are truncated.
  record = { 'query_length' => query&.length, 'query' => nil }

  return record unless query

  normalized = PgQuery.normalize(query)
  record['query'] = normalized

  record[@fingerprint_key] = PgQuery.fingerprint(normalized) if @fingerprint_key

  record
rescue PgQuery::ParseError
  record['query_unparseable'] = true

  record
end

#record_for_row(row) ⇒ Object

Returns a fluentd record for a query row



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 71

def record_for_row(row)
  record = {
    'datid' => row['datid'],
    'datname' => row['datname'],
    'pid' => row['pid'],
    'usesysid' => row['usesysid'],
    'usename' => row['usename'],
    'application_name' => row['application_name'],
    'client_addr' => row['client_addr'],
    'client_hostname' => row['client_hostname'],
    'client_port' => row['client_port'],
    'wait_event_type' => row['wait_event_type'],
    'wait_event' => row['wait_event'],
    'xact_start' => row['xact_start']&.iso8601(3),
    'xact_age_s' => row['xact_age_s'],
    'query_start' => row['query_start']&.iso8601(3),
    'query_age_s' => row['query_age_s'],
    'state_change' => row['state_change']&.iso8601(3),
    'state_age_s' => row['state_age_s'],
    'state' => row['state'],
    'query' => row['query'] # This will be stripped, normalized etc
  }

  # Inject marginalia into record
  parse_marginalia_into_record(record, 'query', true)

  # Normalize query and fingerprint
  # Note that `record['query']` was updated in previous step
  # To strip off marginalia comments
  record.merge!(fingerprint_query(record['query']))

  record
end