Class: LogStash::Inputs::Sqlite

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/sqlite.rb

Overview

Read rows from an sqlite database.

This is most useful in cases where you are logging directly to a table. Any tables being watched must have an ‘id` column that is monotonically increasing.

All tables are read by default except:

  • ones matching ‘sqlite_%` - these are internal/adminstrative tables for sqlite

  • ‘since_table` - this is used by this plugin to track state.

Example

source,sql

% sqlite /tmp/example.db sqlite> CREATE TABLE weblogs (

id INTEGER PRIMARY KEY AUTOINCREMENT,
ip STRING,
request STRING,
response INTEGER);

sqlite> INSERT INTO weblogs (ip, request, response)

VALUES ("1.2.3.4", "/index.html", 200);

Then with this logstash config:

source,ruby

input {

sqlite {
  path => "/tmp/example.db"
  type => weblogs
}

} output {

stdout {
  debug => true
}

}

Sample output:

source,ruby

{

"@source"      => "sqlite://sadness/tmp/x.db",
"@tags"        => [],
"@fields"      => {
  "ip"       => "1.2.3.4",
  "request"  => "/index.html",
  "response" => 200
},
"@timestamp"   => "2013-05-29T06:16:30.850Z",
"@source_host" => "sadness",
"@source_path" => "/tmp/x.db",
"@message"     => "",
"@type"        => "foo"

}

Constant Summary collapse

SINCE_TABLE =
:since_table

Instance Method Summary collapse

Instance Method Details

#get_all_tables(db) ⇒ Object



115
116
117
# File 'lib/logstash/inputs/sqlite.rb', line 115

def get_all_tables(db)
  return db["SELECT * FROM sqlite_master WHERE type = 'table' AND tbl_name != '#{SINCE_TABLE}' AND tbl_name NOT LIKE 'sqlite_%'"].map { |t| t[:name] }.select { |n| !@exclude_tables.include?(n) }
end

#get_n_rows_from_table(db, table, offset, limit) ⇒ Object



120
121
122
123
# File 'lib/logstash/inputs/sqlite.rb', line 120

def get_n_rows_from_table(db, table, offset, limit)
  dataset = db["SELECT * FROM #{table}"]
  return db["SELECT * FROM #{table} WHERE (id > #{offset}) ORDER BY 'id' LIMIT #{limit}"].map { |row| row }
end

#get_placeholder(db, table) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/logstash/inputs/sqlite.rb', line 88

def get_placeholder(db, table)
  since = db[SINCE_TABLE]
  x = since.where(:table => "#{table}")
  if x[:place].nil?
    init_placeholder(db, table) 
    return 0
  else
    @logger.debug("placeholder already exists, it is #{x[:place]}")
    return x[:place][:place]
  end
end

#init_placeholder(db, table) ⇒ Object



101
102
103
104
105
# File 'lib/logstash/inputs/sqlite.rb', line 101

def init_placeholder(db, table)
  @logger.debug("init placeholder for #{table}")
  since = db[SINCE_TABLE]
  since.insert(:table => table, :place => 0)
end

#init_placeholder_table(db) ⇒ Object



76
77
78
79
80
81
82
83
84
85
# File 'lib/logstash/inputs/sqlite.rb', line 76

def init_placeholder_table(db)
  begin
    db.create_table SINCE_TABLE do 
      String :table
      Int    :place
    end
  rescue
    @logger.debug("since tables already exists")
  end
end

#registerObject



126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/logstash/inputs/sqlite.rb', line 126

def register
  require "sequel"
  require "jdbc/sqlite3" 
  @host = Socket.gethostname
  @logger.info("Registering sqlite input", :database => @path)
  @db = Sequel.connect("jdbc:sqlite:#{@path}") 
  @tables = get_all_tables(@db)
  @table_data = {}
  @tables.each do |table|
    init_placeholder_table(@db)
    last_place = get_placeholder(@db, table)
    @table_data[table] = { :name => table, :place => last_place }
  end
end

#run(queue) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/logstash/inputs/sqlite.rb', line 142

def run(queue)
  sleep_min = 0.01
  sleep_max = 5
  sleeptime = sleep_min

  begin
    @logger.debug("Tailing sqlite db", :path => @path)
    loop do
      count = 0
      @table_data.each do |k, table|
        table_name = table[:name]
        offset = table[:place]
        @logger.debug("offset is #{offset}", :k => k, :table => table_name)
        rows = get_n_rows_from_table(@db, table_name, offset, @batch)
        count += rows.count
        rows.each do |row| 
          event = LogStash::Event.new("host" => @host, "db" => @db)
          decorate(event)
          # store each column as a field in the event.
          row.each do |column, element|
            next if column == :id
            event[column.to_s] = element
          end
          queue << event
          @table_data[k][:place] = row[:id]
        end
        # Store the last-seen row in the database
        update_placeholder(@db, table_name, @table_data[k][:place])
      end

      if count == 0
        # nothing found in that iteration
        # sleep a bit
        @logger.debug("No new rows. Sleeping.", :time => sleeptime)
        sleeptime = [sleeptime * 2, sleep_max].min
        sleep(sleeptime)
      else
        sleeptime = sleep_min
      end
    end # loop
  end # begin/rescue
end

#update_placeholder(db, table, place) ⇒ Object



108
109
110
111
112
# File 'lib/logstash/inputs/sqlite.rb', line 108

def update_placeholder(db, table, place)
  @logger.debug("set placeholder to #{place}")
  since = db[SINCE_TABLE]
  since.where(:table => table).update(:place => place)
end