Class: Fluent::Plugin::PgStatActivityInput

Inherits:
PollingPostgresInputPlugin show all
Includes:
MarginaliaExtractor, QueryNormalizer
Defined in:
lib/fluent/plugin/in_pg_stat_activity.rb

Overview

PgStatActivityInput polls the pg_stat_activity table emitting normalized versions of the queries currently running on the postgres server. Fingerprints of the queries are also included for easier aggregation

Constant Summary collapse

ACTIVITY_QUERY =
<<-SQL
  SELECT
    datid,
    datname,
    pid,
    usesysid,
    usename,
    application_name,
    host(client_addr) as client_addr,
    client_hostname,
    client_port,
    wait_event_type,
    wait_event,
    xact_start,
    CAST(extract(epoch from clock_timestamp() - xact_start) AS double precision) xact_age_s,
    query_start,
    CAST(extract(epoch from clock_timestamp() - query_start) AS double precision) query_age_s,
    state_change,
    CAST(extract(epoch from clock_timestamp() - state_change) AS double precision) state_age_s,
    state,
    query
  FROM pg_catalog.pg_stat_activity
  WHERE usename IS NOT NULL
SQL

Constants included from MarginaliaExtractor

MarginaliaExtractor::MARGINALIA_APPENDED_REGEXP, MarginaliaExtractor::MARGINALIA_PREPENDED_REGEXP

Instance Method Summary collapse

Methods included from QueryNormalizer

#normalize_and_fingerprint_query

Methods included from MarginaliaExtractor

#extract_entries, #match_marginalia_comment, #parse_entries, #parse_marginalia_into_record, #scrub_comment, #store_key

Methods inherited from PollingPostgresInputPlugin

#shutdown, #start, #thread_main

Instance Method Details

#emit_activity_to_stream(conn) ⇒ Object

Query the database and emit statements to fluentd router



61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 61

def emit_activity_to_stream(conn)
  me = Fluent::MultiEventStream.new

  now = Fluent::Engine.now
  records = conn.exec(ACTIVITY_QUERY).to_a

  records.each do |row|
    record = record_for_row(row)
    me.add(now, record)
  end

  @router.emit_stream(@tag, me)
end

#record_for_row(row) ⇒ Object

Returns a fluentd record for a query row



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/in_pg_stat_activity.rb', line 76

def record_for_row(row)
  record = {
    'datid' => row['datid'],
    'datname' => row['datname'],
    'pid' => row['pid'],
    'usesysid' => row['usesysid'],
    'usename' => row['usename'],
    'application_name' => row['application_name'],
    'client_addr' => row['client_addr'],
    'client_hostname' => row['client_hostname'],
    'client_port' => row['client_port'],
    'wait_event_type' => row['wait_event_type'],
    'wait_event' => row['wait_event'],
    'xact_start' => row['xact_start']&.iso8601(3),
    'xact_age_s' => row['xact_age_s'],
    'query_start' => row['query_start']&.iso8601(3),
    'query_age_s' => row['query_age_s'],
    'state_change' => row['state_change']&.iso8601(3),
    'state_age_s' => row['state_age_s'],
    'state' => row['state'],
    'query' => row['query'] # This will be stripped, normalized etc
  }

  # Inject marginalia into record
  parse_marginalia_into_record(record, 'query', true)

  opts = {
    input_key: 'query',
    output_key: 'query',
    fingerprint_key: @fingerprint_key,
    max_length: @max_length
  }

  normalize_and_fingerprint_query(record, opts)
end