Module: LogStash::PluginMixins::Jdbc

Included in:
Inputs::Jdbc
Defined in:
lib/logstash/plugin_mixins/jdbc.rb

Overview

Tentative of abstracting JDBC logic to a mixin for potential reuse in other plugins (input/output)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object

This method is called when someone includes this module



12
13
14
15
16
# File 'lib/logstash/plugin_mixins/jdbc.rb', line 12

def self.included(base)
  # Add these methods to the 'base' given.
  base.extend(self)
  base.setup_jdbc_config
end

Instance Method Details

#close_jdbc_connectionObject



130
131
132
# File 'lib/logstash/plugin_mixins/jdbc.rb', line 130

def close_jdbc_connection
  @database.disconnect if @database
end

#execute_statement(statement, parameters) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/logstash/plugin_mixins/jdbc.rb', line 135

def execute_statement(statement, parameters)
  success = false
  begin 
    parameters = symbolized_params(parameters)
    query = @database[statement, parameters]
    @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters)
    @sql_last_start = Time.now.utc

    if @jdbc_paging_enabled
      query.each_page(@jdbc_page_size) do |paged_dataset|
        paged_dataset.each do |row|
          yield extract_values_from(row)
        end
      end
    else
      query.each do |row|
        yield extract_values_from(row)
      end
    end
    success = true
  rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
    @logger.warn("Exception when executing JDBC query", :exception => e)
  end
  return success
end

#prepare_jdbc_connectionObject



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/logstash/plugin_mixins/jdbc.rb', line 96

def prepare_jdbc_connection
  require "java"
  require "sequel"
  require "sequel/adapters/jdbc"
  require @jdbc_driver_library if @jdbc_driver_library
  begin
    Sequel::JDBC.load_driver(@jdbc_driver_class)
  rescue Sequel::AdapterNotFound => e
    message = if @jdbc_driver_library.nil?
                ":jdbc_driver_library is not set, are you sure you included 
                the proper driver client libraries in your classpath?"
              else
                "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
              end
    raise LogStash::ConfigurationError, "#{e}. #{message}"
  end
  @database = jdbc_connect()
  @database.extension(:pagination)
  if @jdbc_validate_connection
    @database.extension(:connection_validator)
    @database.pool.connection_validation_timeout = @jdbc_validation_timeout
  end
  @database.fetch_size = @jdbc_fetch_size unless @jdbc_fetch_size.nil?
  begin
    @database.test_connection
  rescue Sequel::DatabaseConnectionError => e
    #TODO return false and let the plugin raise a LogStash::ConfigurationError
    raise e
  end

  @sql_last_start = Time.at(0).utc
end

#setup_jdbc_configObject



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/logstash/plugin_mixins/jdbc.rb', line 20

def setup_jdbc_config
  # JDBC driver library path to third party driver library.
  #
  # If not provided, Plugin will look for the driver class in the Logstash Java classpath.
  config :jdbc_driver_library, :validate => :path

  # JDBC driver class to load, for exmaple, "org.apache.derby.jdbc.ClientDriver"
  # NB per https://github.com/logstash-plugins/logstash-input-jdbc/issues/43 if you are using
  # the Oracle JDBC driver (ojdbc6.jar) the correct `jdbc_driver_class` is `"Java::oracle.jdbc.driver.OracleDriver"`
  config :jdbc_driver_class, :validate => :string, :required => true

  # JDBC connection string
  config :jdbc_connection_string, :validate => :string, :required => true

  # JDBC user
  config :jdbc_user, :validate => :string, :required => true

  # JDBC password
  config :jdbc_password, :validate => :password

  # JDBC enable paging
  #
  # This will cause a sql statement to be broken up into multiple queries.
  # Each query will use limits and offsets to collectively retrieve the full
  # result-set. The limit size is set with `jdbc_page_size`.
  #
  # Be aware that ordering is not guaranteed between queries.
  config :jdbc_paging_enabled, :validate => :boolean, :default => false 

  # JDBC page size
  config :jdbc_page_size, :validate => :number, :default => 100000

  # JDBC fetch size. if not provided, respective driver's default will be used
  config :jdbc_fetch_size, :validate => :number

  # Connection pool configuration.
  # Validate connection before use.
  config :jdbc_validate_connection, :validate => :boolean, :default => false

  # Connection pool configuration.
  # How often to validate a connection (in seconds)
  config :jdbc_validation_timeout, :validate => :number, :default => 3600

  # Connection pool configuration.
  # The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5)
  config :jdbc_pool_timeout, :validate => :number, :default => 5

  # General/Vendor-specific Sequel configuration options.
  #
  # An example of an optional connection pool configuration
  #    max_connections - The maximum number of connections the connection pool
  #
  # examples of vendor-specific options can be found in this
  # documentation page: https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc
  config :sequel_opts, :validate => :hash, :default => {}
end