Class: LogStash::Inputs::Sqlite

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/sqlite.rb

Overview

Read rows from an sqlite database.

This is most useful in cases where you are logging directly to a table. Any tables being watched must have an ‘id` column that is monotonically increasing.

All tables are read by default except:

  • ones matching ‘sqlite_%` - these are internal/adminstrative tables for sqlite

  • ‘since_table` - this is used by this plugin to track state.

Example

source,sql

% sqlite /tmp/example.db sqlite> CREATE TABLE weblogs (

id INTEGER PRIMARY KEY AUTOINCREMENT,
ip STRING,
request STRING,
response INTEGER);

sqlite> INSERT INTO weblogs (ip, request, response)

VALUES ("1.2.3.4", "/index.html", 200);

Then with this logstash config:

source,ruby

input {

sqlite {
  path => "/tmp/example.db"
  type => weblogs
}

} output {

stdout {
  debug => true
}

}

Sample output:

source,ruby

{

"@source"      => "sqlite://sadness/tmp/x.db",
"@tags"        => [],
"@fields"      => {
  "ip"       => "1.2.3.4",
  "request"  => "/index.html",
  "response" => 200
},
"@timestamp"   => "2013-05-29T06:16:30.850Z",
"@source_host" => "sadness",
"@source_path" => "/tmp/x.db",
"@message"     => "",
"@type"        => "foo"

}

Constant Summary collapse

SINCE_TABLE =
:since_table

Instance Method Summary collapse

Instance Method Details

#get_all_tables(db) ⇒ Object



117
118
119
# File 'lib/logstash/inputs/sqlite.rb', line 117

def get_all_tables(db)
  return db["SELECT * FROM sqlite_master WHERE type = 'table' AND tbl_name != '#{SINCE_TABLE}' AND tbl_name NOT LIKE 'sqlite_%'"].map { |t| t[:name] }.select { |n| !@exclude_tables.include?(n) }
end

#get_n_rows_from_table(db, table, offset, limit) ⇒ Object



122
123
124
125
# File 'lib/logstash/inputs/sqlite.rb', line 122

def get_n_rows_from_table(db, table, offset, limit)
  dataset = db["SELECT * FROM #{table}"]
  return db["SELECT * FROM #{table} WHERE (id > #{offset}) ORDER BY 'id' LIMIT #{limit}"].map { |row| row }
end

#get_placeholder(db, table) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/logstash/inputs/sqlite.rb', line 90

def get_placeholder(db, table)
  since = db[SINCE_TABLE]
  x = since.where(:table => "#{table}")
  if x[:place].nil?
    init_placeholder(db, table) 
    return 0
  else
    @logger.debug("placeholder already exists, it is #{x[:place]}")
    return x[:place][:place]
  end
end

#init_placeholder(db, table) ⇒ Object



103
104
105
106
107
# File 'lib/logstash/inputs/sqlite.rb', line 103

def init_placeholder(db, table)
  @logger.debug("init placeholder for #{table}")
  since = db[SINCE_TABLE]
  since.insert(:table => table, :place => 0)
end

#init_placeholder_table(db) ⇒ Object



78
79
80
81
82
83
84
85
86
87
# File 'lib/logstash/inputs/sqlite.rb', line 78

def init_placeholder_table(db)
  begin
    db.create_table SINCE_TABLE do 
      String :table
      Int    :place
    end
  rescue
    @logger.debug("since tables already exists")
  end
end

#registerObject



128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/logstash/inputs/sqlite.rb', line 128

def register
  require "sequel"
  require "jdbc/sqlite3" 
  @host = Socket.gethostname
  @logger.info("Registering sqlite input", :database => @path)
  @db = Sequel.connect("jdbc:sqlite:#{@path}") 
  @tables = get_all_tables(@db)
  @table_data = {}
  @tables.each do |table|
    init_placeholder_table(@db)
    last_place = get_placeholder(@db, table)
    @table_data[table] = { :name => table, :place => last_place }
  end
end

#run(queue) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/logstash/inputs/sqlite.rb', line 144

def run(queue)
  sleep_min = 0.01
  sleep_max = 5
  sleeptime = sleep_min

  begin
    @logger.debug("Tailing sqlite db", :path => @path)
    until stop?
      count = 0
      @table_data.each do |k, table|
        table_name = table[:name]
        offset = table[:place]
        @logger.debug("offset is #{offset}", :k => k, :table => table_name)
        rows = get_n_rows_from_table(@db, table_name, offset, @batch)
        count += rows.count
        rows.each do |row| 
          event = LogStash::Event.new("host" => @host, "db" => @db)
          decorate(event)
          # store each column as a field in the event.
          row.each do |column, element|
            next if column == :id
            event.set(column.to_s, element)
          end
          queue << event
          @table_data[k][:place] = row[:id]
        end
        # Store the last-seen row in the database
        update_placeholder(@db, table_name, @table_data[k][:place])
      end

      if count == 0
        # nothing found in that iteration
        # sleep a bit
        @logger.debug("No new rows. Sleeping.", :time => sleeptime)
        sleeptime = [sleeptime * 2, sleep_max].min

        Stud.stoppable_sleep(sleeptime) { stop? }
      else
        sleeptime = sleep_min
      end
    end # loop
  end # begin/rescue
end

#update_placeholder(db, table, place) ⇒ Object



110
111
112
113
114
# File 'lib/logstash/inputs/sqlite.rb', line 110

def update_placeholder(db, table, place)
  @logger.debug("set placeholder to #{place}")
  since = db[SINCE_TABLE]
  since.where(:table => table).update(:place => place)
end