Class: Fluent::PgStatStatementsInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_pg_stat.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/fluent/plugin/in_pg_stat.rb', line 33

def configure(conf)
  super

  @column_names = %w(
  calls total_time rows
  shared_blks_hit shared_blks_read shared_blks_dirtied shared_blks_written
  local_blks_hit local_blks_read local_blks_dirtied local_blks_written
  temp_blks_read temp_blks_written
  blk_read_time blk_write_time
  )
end

#desc(description) ⇒ Object



11
12
# File 'lib/fluent/plugin/in_pg_stat.rb', line 11

def desc(description)
end

#shutdownObject



58
59
60
61
62
# File 'lib/fluent/plugin/in_pg_stat.rb', line 58

def shutdown
  @stop_flag = true
  $log.debug 'Waiting for thread to finish'
  @thread.join
end

#startObject



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/in_pg_stat.rb', line 45

def start
  @conn = PG.connect(
    host: @host,
    dbname: 'postgres',
    user: @username,
    password: @password
  )
  @conn.type_map_for_results = PG::BasicTypeMapForResults.new @conn

  @stop_flag = false
  @thread = Thread.new(&method(:thread_main))
end

#thread_mainObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/in_pg_stat.rb', line 64

def thread_main
  until @stop_flag
    sleep 60

    begin
      me = MultiEventStream.new
      begin
        old_metrics = JSON.parse(IO.read(@state_file))
      rescue => e
        $log.info e
        old_metrics = {}
      end
      new_metrics = {}

      # More than one records of the same query string can exist
      # because queries longer than xxx bbytes are truncated.
      # Treat them as identical.
      now = Engine.now
      @conn.exec(<<-EOS
      select query, #{@column_names.map{|col| "cast (sum(#{col}) as double precision) as #{col}"}.join(', ')} from pg_stat_statements group by query order by total_time desc
      EOS
                ).each do |row|
        new_metrics['timestamp'] = now.to_f
        query = row['query']
        new_metrics[query] = {}

        @column_names.each do |col|
          new_metrics[query][col] = row[col]
          row[col] = if !old_metrics[query].nil? && old_metrics[query][col].is_a?(Numeric)
                       val = (row[col] - old_metrics[query][col]) * 60 / (new_metrics['timestamp'] - old_metrics['timestamp'])
                       val >= 0 ? val : nil
                     end
        end

        # Calculate average values
        if row['calls'].is_a?(Numeric) && row['calls'] != 0
          row['avg_rows'] = row['rows'] / row['calls'] if row['rows'].is_a? Numeric
          row['avg_time'] = row['total_time'] / row['calls'] if row['total_time'].is_a? Numeric
        end

        me.add(now, row)
      end
      @router.emit_stream(@tag, me)
      IO.write(@state_file, new_metrics.to_json)

    rescue => e
      log.error 'unexpected error', error: e.message, error_class: e.class
      log.error_backtrace e.backtrace
    end
  end
end