Class: LogStash::Outputs::Jdbc

Inherits:
Base
  • Object
show all
Includes:
Stud::Buffer
Defined in:
lib/logstash/outputs/jdbc.rb

Instance Method Summary collapse

Instance Method Details

#flush(events, teardown = false) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/logstash/outputs/jdbc.rb', line 103

def flush(events, teardown=false)
  statement = @connection.prepareStatement(@statement[0])

  events.each do |event|
    next if @statement.length < 2

    @statement[1..-1].each_with_index do |i, idx|
      case event[i]
      when Time, LogStash::Timestamp
        # Most reliable solution, cross JDBC driver
        statement.setString(idx + 1, event[i].iso8601())
      when Fixnum, Integer
        statement.setInt(idx + 1, event[i])
      when Float
        statement.setFloat(idx + 1, event[i])
      when String
        statement.setString(idx + 1, event[i])
      when true
        statement.setBoolean(idx + 1, true)
      when false
        statement.setBoolean(idx + 1, false)
      when nil
        statement.setString(idx + 1, nil)
      else
        statement.setString(idx + 1, event.sprintf(i))
      end
    end

    statement.addBatch()
  end

  begin
    @logger.debug("JDBC - Sending SQL", :sql => statement.toString())
    statement.executeBatch()
  rescue => e
    # Raising an exception will incur a retry from Stud::Buffer.
    # Since the exceutebatch failed this should mean any events failed to be
    # inserted will be re-run. We're going to log it for the lols anyway.
    @logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
    if e.getNextException() != nil
     @logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException())
	  end
  end

  statement.close()
end

#on_flush_error(e) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/logstash/outputs/jdbc.rb', line 150

def on_flush_error(e)
  return if @max_repeat_exceptions < 1

  if @last_exception == e.to_s
    @repeat_exception_count += 1
  else
    @repeat_exception_count = 0
  end

  if (@repeat_exception_count >= @max_repeat_exceptions) and (Time.now - @last_exception_time) < @max_repeat_exceptions_time
    @logger.error("JDBC - Exception repeated more than the maximum configured", :exception => e, :max_repeat_exceptions => @max_repeat_exceptions, :max_repeat_exceptions_time => @max_repeat_exceptions_time)
    raise e
  end

  @last_exception_time = Time.now
  @last_exception = e.to_s
end

#receive(event) ⇒ Object



96
97
98
99
100
101
# File 'lib/logstash/outputs/jdbc.rb', line 96

def receive(event)
  return unless output?(event)
  return unless @statement.length > 0

  buffer_receive(event)
end

#registerObject

Raises:

  • (Exception)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/logstash/outputs/jdbc.rb', line 51

def register

  @logger.info("JDBC - Starting up")

  if ENV['LOGSTASH_HOME']
    jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar")
  else
    jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar")
  end

  @logger.debug("JDBC - jarpath", path: jarpath)

  jars = Dir[jarpath]
  raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty?

  jars.each do |jar|
    @logger.debug("JDBC - Loaded jar", :jar => jar)
    require jar
  end

  import @driver_class

  driver = Object.const_get(@driver_class[@driver_class.rindex('.') + 1, @driver_class.length]).new
  @connection = driver.connect(@connection_string, java.util.Properties.new)

  @logger.debug("JDBC - Created connection", :driver => driver, :connection => @connection)

  if (@flush_size > 1000)
    @logger.warn("JDBC - Flush size is set to > 1000. May have performance penalties, depending on your SQL engine.")
  end

  @repeat_exception_count = 0
  @last_exception_time = Time.now

  if (@max_repeat_exceptions > 0) and ((@idle_flush_time * @max_repeat_exceptions) > @max_repeat_exceptions_time)
    @logger.warn("JDBC - max_repeat_exceptions_time is set such that it may still permit a looping exception. You probably changed idle_flush_time. Considering increasing max_repeat_exceptions_time.")
  end

  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
end

#teardownObject



168
169
170
171
172
# File 'lib/logstash/outputs/jdbc.rb', line 168

def teardown
  buffer_flush(:final => true)
  @connection.close()
  super
end