Class: LogStash::Inputs::Sqlite

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/sqlite.rb

Overview

Read rows from an sqlite database.

This is most useful in cases where you are logging directly to a table. Any tables being watched must have an ‘id` column that is monotonically increasing.

All tables are read by default except:

  • ones matching ‘sqlite_%` - these are internal/adminstrative tables for sqlite

  • ‘since_table` - this is used by this plugin to track state.

Example

source,sql

% sqlite /tmp/example.db sqlite> CREATE TABLE weblogs (

id INTEGER PRIMARY KEY AUTOINCREMENT,
ip STRING,
request STRING,
response INTEGER);

sqlite> INSERT INTO weblogs (ip, request, response)

VALUES ("1.2.3.4", "/index.html", 200);

Then with this logstash config:

source,ruby

input {

sqlite {
  path => "/tmp/example.db"
  type => weblogs
}

} output {

stdout {
  debug => true
}

}

Sample output:

source,ruby

{

"@source"      => "sqlite://sadness/tmp/x.db",
"@tags"        => [],
"@fields"      => {
  "ip"       => "1.2.3.4",
  "request"  => "/index.html",
  "response" => 200
},
"@timestamp"   => "2013-05-29T06:16:30.850Z",
"@source_host" => "sadness",
"@source_path" => "/tmp/x.db",
"@message"     => "",
"@type"        => "foo"

}

Constant Summary collapse

SINCE_TABLE =
:since_table

Instance Method Summary collapse

Instance Method Details

#get_all_tables(db) ⇒ Object



116
117
118
# File 'lib/logstash/inputs/sqlite.rb', line 116

def get_all_tables(db)
  return db["SELECT * FROM sqlite_master WHERE type = 'table' AND tbl_name != '#{SINCE_TABLE}' AND tbl_name NOT LIKE 'sqlite_%'"].map { |t| t[:name] }.select { |n| !@exclude_tables.include?(n) }
end

#get_n_rows_from_table(db, table, offset, limit) ⇒ Object



121
122
123
124
# File 'lib/logstash/inputs/sqlite.rb', line 121

def get_n_rows_from_table(db, table, offset, limit)
  dataset = db["SELECT * FROM #{table}"]
  return db["SELECT * FROM #{table} WHERE (id > #{offset}) ORDER BY 'id' LIMIT #{limit}"].map { |row| row }
end

#get_placeholder(db, table) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/logstash/inputs/sqlite.rb', line 89

def get_placeholder(db, table)
  since = db[SINCE_TABLE]
  x = since.where(:table => "#{table}")
  if x[:place].nil?
    init_placeholder(db, table) 
    return 0
  else
    @logger.debug("placeholder already exists, it is #{x[:place]}")
    return x[:place][:place]
  end
end

#init_placeholder(db, table) ⇒ Object



102
103
104
105
106
# File 'lib/logstash/inputs/sqlite.rb', line 102

def init_placeholder(db, table)
  @logger.debug("init placeholder for #{table}")
  since = db[SINCE_TABLE]
  since.insert(:table => table, :place => 0)
end

#init_placeholder_table(db) ⇒ Object



77
78
79
80
81
82
83
84
85
86
# File 'lib/logstash/inputs/sqlite.rb', line 77

def init_placeholder_table(db)
  begin
    db.create_table SINCE_TABLE do 
      String :table
      Int    :place
    end
  rescue
    @logger.debug("since tables already exists")
  end
end

#registerObject



127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/logstash/inputs/sqlite.rb', line 127

def register
  require "sequel"
  require "jdbc/sqlite3" 
  @host = Socket.gethostname
  @logger.info("Registering sqlite input", :database => @path)
  @db = Sequel.connect("jdbc:sqlite:#{@path}") 
  @tables = get_all_tables(@db)
  @table_data = {}
  @tables.each do |table|
    init_placeholder_table(@db)
    last_place = get_placeholder(@db, table)
    @table_data[table] = { :name => table, :place => last_place }
  end
end

#run(queue) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/logstash/inputs/sqlite.rb', line 143

def run(queue)
  sleep_min = 0.01
  sleep_max = 5
  sleeptime = sleep_min

  begin
    @logger.debug("Tailing sqlite db", :path => @path)
    loop do
      count = 0
      @table_data.each do |k, table|
        table_name = table[:name]
        offset = table[:place]
        @logger.debug("offset is #{offset}", :k => k, :table => table_name)
        rows = get_n_rows_from_table(@db, table_name, offset, @batch)
        count += rows.count
        rows.each do |row| 
          event = LogStash::Event.new("host" => @host, "db" => @db)
          decorate(event)
          # store each column as a field in the event.
          row.each do |column, element|
            next if column == :id
            event[column.to_s] = element
          end
          queue << event
          @table_data[k][:place] = row[:id]
        end
        # Store the last-seen row in the database
        update_placeholder(@db, table_name, @table_data[k][:place])
      end

      if count == 0
        # nothing found in that iteration
        # sleep a bit
        @logger.debug("No new rows. Sleeping.", :time => sleeptime)
        sleeptime = [sleeptime * 2, sleep_max].min
        sleep(sleeptime)
      else
        sleeptime = sleep_min
      end
    end # loop
  end # begin/rescue
end

#update_placeholder(db, table, place) ⇒ Object



109
110
111
112
113
# File 'lib/logstash/inputs/sqlite.rb', line 109

def update_placeholder(db, table, place)
  @logger.debug("set placeholder to #{place}")
  since = db[SINCE_TABLE]
  since.where(:table => table).update(:place => place)
end