Class: Fluent::Plugin::PgQueryInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_pg_query.rb

Constant Summary collapse

NAME =
'pg_query'
DEFAULT_TAG =
NAME
DEFAULT_INTERVAL =
300
DEFAULT_PG_HOST =
'localhost'
DEFAULT_PG_PORT =
5432
DEFAULT_PG_USER =
nil
DEFAULT_PG_PASSWORD =
nil
DEFAULT_PG_SSLMODE =
:allow
DEFAULT_PG_CONNECT_TIMEOUT =
10
DEFAULT_PG_TRY_COUNT =
3
DEFAULT_PG_TRY_DELAY =
5
DEFAULT_CA_CERT =
nil
DEFAULT_QUERY_TAG =
nil

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/in_pg_query.rb', line 82

def configure(conf)
  super

  raise Fluent::ConfigError, 'tag should not be empty' if tag.empty?

  configure_pg_connection
  configure_pg_queries
end

#configure_pg_connectionObject

Raises:

  • (Fluent::ConfigError)


91
92
93
94
95
96
97
# File 'lib/fluent/plugin/in_pg_query.rb', line 91

def configure_pg_connection
  raise Fluent::ConfigError, 'port should be >= 0 and <= 65535' if port.negative? || port > 65_535
  raise Fluent::ConfigError, 'database should not empty' if database.empty?
  raise Fluent::ConfigError, 'connect_timeout should be >= 0' if connect_timeout.negative?
  raise Fluent::ConfigError, 'try_count should be >= 0' if try_count.negative?
  raise Fluent::ConfigError, 'try_delay should be >= 0' if try_delay.negative?
end

#configure_pg_queriesObject

Raises:

  • (Fluent::ConfigError)


99
100
101
# File 'lib/fluent/plugin/in_pg_query.rb', line 99

def configure_pg_queries
  raise Fluent::ConfigError, 'queries should not be empty' if queries.empty?
end

#emit_query_records(query_records:, query_time: Fluent::Engine.now, query_tag: nil) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/in_pg_query.rb', line 132

def emit_query_records(query_records:, query_time: Fluent::Engine.now, query_tag: nil)
  current_tag = [tag, query_tag].compact.join('.')
  query_events = MultiEventStream.new
  query_records.each do |record|
    query_events.add(query_time, record)
  end
  router.emit_stream(current_tag, query_events)
end

#pg_clientObject



141
142
143
# File 'lib/fluent/plugin/in_pg_query.rb', line 141

def pg_client
  @pg_client ||= Fluent::Plugin::PgQuery::PgClient.from_conf(self)
end

#run_queriesObject



117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/in_pg_query.rb', line 117

def run_queries
  queries.each do |query|
    run_query(query)
  rescue StandardError => e
    log.error "while runnig query: #{query.sql}: #{e}"
  end
  pg_client.standby
end

#run_query(query) ⇒ Object



126
127
128
129
130
# File 'lib/fluent/plugin/in_pg_query.rb', line 126

def run_query(query)
  query_time = Fluent::Engine.now
  records = pg_client.query(query.sql)
  emit_query_records(query_time: query_time, query_tag: query.tag, query_records: records)
end

#shutdownObject



111
112
113
114
115
# File 'lib/fluent/plugin/in_pg_query.rb', line 111

def shutdown
  pg_client.close

  super
end

#startObject



103
104
105
106
107
108
109
# File 'lib/fluent/plugin/in_pg_query.rb', line 103

def start
  super

  timer_execute(:run_queries_first, 1, repeat: false, &method(:run_queries)) if interval > 60

  timer_execute(:run_queries, interval, repeat: true, &method(:run_queries))
end