Class: Fluent::Plugin::PgQuery::PgClient

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/pg_query/pg_client.rb

Constant Summary collapse

DEFAULT_TRY_COUNT =
3
DEFAULT_TRY_DELAY =
5
DEFAULT_HOST =
'localhost'
DEFAULT_PORT =
5432
DEFAULT_TIMEOUT =
10
DEFAULT_SSLMODE =
:allow

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(try_count: DEFAULT_TRY_COUNT, try_delay: DEFAULT_TRY_DELAY, logger: nil, **params) ⇒ PgClient

Returns a new instance of PgClient.



19
20
21
22
23
24
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 19

def initialize(try_count: DEFAULT_TRY_COUNT, try_delay: DEFAULT_TRY_DELAY, logger: nil, **params)
  @try_count = try_count
  @try_delay = try_delay
  @logger = logger
  @params = params
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



17
18
19
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 17

def logger
  @logger
end

#paramsObject (readonly)

Returns the value of attribute params.



17
18
19
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 17

def params
  @params
end

#try_countObject (readonly)

Returns the value of attribute try_count.



17
18
19
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 17

def try_count
  @try_count
end

#try_delayObject (readonly)

Returns the value of attribute try_delay.



17
18
19
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 17

def try_delay
  @try_delay
end

Class Method Details

.from_conf(conf) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 74

def from_conf(conf)
  new(
    host: conf.host,
    port: conf.port,
    dbname: conf.database,
    connect_timeout: conf.connect_timeout,
    user: conf.user,
    password: conf.password,
    sslmode: conf.sslmode,
    sslrootcert: conf.ca_cert,
    try_count: conf.try_count,
    try_delay: conf.try_delay,
    logger: conf.log
  )
end

Instance Method Details

#closeObject



67
68
69
70
71
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 67

def close
  @pg&.close
ensure
  @pg = nil
end

#default_pg_paramsObject



58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 58

def default_pg_params
  {
    host: DEFAULT_HOST,
    port: DEFAULT_PORT,
    connect_timeout: DEFAULT_TIMEOUT,
    sslmode: DEFAULT_SSLMODE
  }
end

#pgObject



46
47
48
49
50
51
52
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 46

def pg
  return @pg if @pg

  @pg = PG.connect(**pg_params)
  @pg.type_map_for_results = PG::BasicTypeMapForResults.new(@pg)
  @pg
end

#pg_paramsObject



54
55
56
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 54

def pg_params
  default_pg_params.merge(params.compact)
end

#query(query_str) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 26

def query(query_str)
  try_attempt ||= 0
  result = pg.exec(query_str)
  result.to_a
rescue PG::Error => e
  try_attempt += 1
  if try_count && try_attempt < try_count
    logger&.warn("PG error on attempt #{try_attempt}/#{try_count}: #{e}")
    close
    sleep try_delay
    retry
  end
  logger&.error("PG error after #{try_count} attempts: #{e}")
  []
end

#standbyObject



42
43
44
# File 'lib/fluent/plugin/pg_query/pg_client.rb', line 42

def standby
  close
end