Class: Fluent::Plugin::PgQuery::PgClient
- Inherits:
-
Object
- Object
- Fluent::Plugin::PgQuery::PgClient
- Defined in:
- lib/fluent/plugin/pg_query/pg_client.rb
Constant Summary collapse
- DEFAULT_TRY_COUNT =
3- DEFAULT_TRY_DELAY =
5- DEFAULT_HOST =
'localhost'- DEFAULT_PORT =
5432- DEFAULT_TIMEOUT =
10- DEFAULT_SSLMODE =
:allow
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#params ⇒ Object
readonly
Returns the value of attribute params.
-
#try_count ⇒ Object
readonly
Returns the value of attribute try_count.
-
#try_delay ⇒ Object
readonly
Returns the value of attribute try_delay.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #default_pg_params ⇒ Object
-
#initialize(try_count: DEFAULT_TRY_COUNT, try_delay: DEFAULT_TRY_DELAY, logger: nil, **params) ⇒ PgClient
constructor
A new instance of PgClient.
- #pg ⇒ Object
- #pg_params ⇒ Object
- #query(query_str) ⇒ Object
- #standby ⇒ Object
Constructor Details
#initialize(try_count: DEFAULT_TRY_COUNT, try_delay: DEFAULT_TRY_DELAY, logger: nil, **params) ⇒ PgClient
Returns a new instance of PgClient.
19 20 21 22 23 24 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 19 def initialize(try_count: DEFAULT_TRY_COUNT, try_delay: DEFAULT_TRY_DELAY, logger: nil, **params) @try_count = try_count @try_delay = try_delay @logger = logger @params = params end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
17 18 19 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 17 def logger @logger end |
#params ⇒ Object (readonly)
Returns the value of attribute params.
17 18 19 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 17 def params @params end |
#try_count ⇒ Object (readonly)
Returns the value of attribute try_count.
17 18 19 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 17 def try_count @try_count end |
#try_delay ⇒ Object (readonly)
Returns the value of attribute try_delay.
17 18 19 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 17 def try_delay @try_delay end |
Class Method Details
.from_conf(conf) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 74 def from_conf(conf) new( host: conf.host, port: conf.port, dbname: conf.database, connect_timeout: conf.connect_timeout, user: conf.user, password: conf.password, sslmode: conf.sslmode, sslrootcert: conf.ca_cert, try_count: conf.try_count, try_delay: conf.try_delay, logger: conf.log ) end |
Instance Method Details
#close ⇒ Object
67 68 69 70 71 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 67 def close @pg&.close ensure @pg = nil end |
#default_pg_params ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 58 def default_pg_params { host: DEFAULT_HOST, port: DEFAULT_PORT, connect_timeout: DEFAULT_TIMEOUT, sslmode: DEFAULT_SSLMODE } end |
#pg ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 46 def pg return @pg if @pg @pg = PG.connect(**pg_params) @pg.type_map_for_results = PG::BasicTypeMapForResults.new(@pg) @pg end |
#pg_params ⇒ Object
54 55 56 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 54 def pg_params default_pg_params.merge(params.compact) end |
#query(query_str) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 26 def query(query_str) try_attempt ||= 0 result = pg.exec(query_str) result.to_a rescue PG::Error => e try_attempt += 1 if try_count && try_attempt < try_count logger&.warn("PG error on attempt #{try_attempt}/#{try_count}: #{e}") close sleep try_delay retry end logger&.error("PG error after #{try_count} attempts: #{e}") [] end |
#standby ⇒ Object
42 43 44 |
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 42 def standby close end |