Class: Fluent::Plugin::PgQueryInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::PgQueryInput
- Defined in:
- lib/fluent/plugin/in_pg_query.rb
Constant Summary collapse
- NAME =
'pg_query'- DEFAULT_TAG =
NAME- DEFAULT_INTERVAL =
300- DEFAULT_PG_HOST =
'localhost'- DEFAULT_PG_PORT =
5432- DEFAULT_PG_USER =
nil- DEFAULT_PG_PASSWORD =
nil- DEFAULT_PG_SSLMODE =
:allow- DEFAULT_PG_CONNECT_TIMEOUT =
10- DEFAULT_PG_TRY_COUNT =
3- DEFAULT_PG_TRY_DELAY =
5- DEFAULT_CA_CERT =
nil- DEFAULT_QUERY_TAG =
nil
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #configure_pg_connection ⇒ Object
- #configure_pg_queries ⇒ Object
- #emit_query_records(query_records:, query_time: Fluent::Engine.now, query_tag: nil) ⇒ Object
- #pg_client ⇒ Object
- #run_queries ⇒ Object
- #run_query(query) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/in_pg_query.rb', line 82 def configure(conf) super raise Fluent::ConfigError, 'tag should not be empty' if tag.empty? configure_pg_connection configure_pg_queries end |
#configure_pg_connection ⇒ Object
91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/in_pg_query.rb', line 91 def configure_pg_connection raise Fluent::ConfigError, 'port should be >= 0 and <= 65535' if port.negative? || port > 65_535 raise Fluent::ConfigError, 'database should not empty' if database.empty? raise Fluent::ConfigError, 'connect_timeout should be >= 0' if connect_timeout.negative? raise Fluent::ConfigError, 'try_count should be >= 0' if try_count.negative? raise Fluent::ConfigError, 'try_delay should be >= 0' if try_delay.negative? end |
#configure_pg_queries ⇒ Object
99 100 101 |
# File 'lib/fluent/plugin/in_pg_query.rb', line 99 def configure_pg_queries raise Fluent::ConfigError, 'queries should not be empty' if queries.empty? end |
#emit_query_records(query_records:, query_time: Fluent::Engine.now, query_tag: nil) ⇒ Object
132 133 134 135 136 137 138 139 |
# File 'lib/fluent/plugin/in_pg_query.rb', line 132 def emit_query_records(query_records:, query_time: Fluent::Engine.now, query_tag: nil) current_tag = [tag, query_tag].compact.join('.') query_events = MultiEventStream.new query_records.each do |record| query_events.add(query_time, record) end router.emit_stream(current_tag, query_events) end |
#pg_client ⇒ Object
141 142 143 |
# File 'lib/fluent/plugin/in_pg_query.rb', line 141 def pg_client @pg_client ||= Fluent::Plugin::PgQuery::PgClient.from_conf(self) end |
#run_queries ⇒ Object
117 118 119 120 121 122 123 124 |
# File 'lib/fluent/plugin/in_pg_query.rb', line 117 def run_queries queries.each do |query| run_query(query) rescue StandardError => e log.error "while runnig query: #{query.sql}: #{e}" end pg_client.standby end |
#run_query(query) ⇒ Object
126 127 128 129 130 |
# File 'lib/fluent/plugin/in_pg_query.rb', line 126 def run_query(query) query_time = Fluent::Engine.now records = pg_client.query(query.sql) emit_query_records(query_time: query_time, query_tag: query.tag, query_records: records) end |
#shutdown ⇒ Object
111 112 113 114 115 |
# File 'lib/fluent/plugin/in_pg_query.rb', line 111 def shutdown pg_client.close super end |
#start ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/fluent/plugin/in_pg_query.rb', line 103 def start super timer_execute(:run_queries_first, 1, repeat: false, &method(:run_queries)) if interval > 60 timer_execute(:run_queries, interval, repeat: true, &method(:run_queries)) end |