Class: LogStash::Inputs::MongoDB

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/mongodb.rb

Overview

Generate a repeating message.

This plugin is intented only as an example.

Constant Summary collapse

SINCE_TABLE =
:since_table

Instance Method Summary collapse

Instance Method Details

#flatten(my_hash) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/logstash/inputs/mongodb.rb', line 187

def flatten(my_hash)
  new_hash = {}
  if my_hash.respond_to? :each
    my_hash.each do |k1,v1|
      if v1.is_a?(Hash)
        v1.each do |k2,v2|
          if v2.is_a?(Hash)
            # puts "Found a nested hash"
            result = flatten(v2)
            result.each do |k3,v3|
              new_hash[k1.to_s+"_"+k2.to_s+"_"+k3.to_s] = v3
            end
            # puts "result: "+result.to_s+" k2: "+k2.to_s+" v2: "+v2.to_s
          else
            new_hash[k1.to_s+"_"+k2.to_s] = v2
          end
        end
      else
        # puts "Key: "+k1.to_s+" is not a hash"
        new_hash[k1.to_s] = v1
      end
    end
  else
    @logger.debug("Flatten [ERROR]: hash did not respond to :each")
  end
  return new_hash
end

#get_all_tables(mongodb) ⇒ Object



115
116
117
# File 'lib/logstash/inputs/mongodb.rb', line 115

def get_all_tables(mongodb)
  return @mongodb.collection_names
end

#get_collection_names(mongodb, collection) ⇒ Object



120
121
122
123
124
125
126
127
128
129
# File 'lib/logstash/inputs/mongodb.rb', line 120

def get_collection_names(mongodb, collection)
  collection_names = []
  @mongodb.collection_names.each do |coll|
    if /#{collection}/ =~ coll
      collection_names.push(coll)
      @logger.debug("Added #{coll} to the collection list as it matches our collection search")
    end
  end
  return collection_names
end

#get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size) ⇒ Object



132
133
134
135
136
137
# File 'lib/logstash/inputs/mongodb.rb', line 132

def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)
  collection = mongodb.collection(mongo_collection_name)
  # Need to make this sort by date in object id then get the first of the series
  # db.events_20150320.find().limit(1).sort({ts:1})
  return collection.find({:_id => {:$gte => last_id_object}}).limit(batch_size)
end

#get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/logstash/inputs/mongodb.rb', line 94

def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
  since = sqlitedb[SINCE_TABLE]
  x = since.where(:table => "#{since_table}_#{mongo_collection_name}")
  if x[:place].nil? || x[:place] == 0
    first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
    @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}")
    return first_entry_id
  else
    @logger.debug("placeholder already exists, it is #{x[:place]}")
    return x[:place][:place]
  end
end

#init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) ⇒ Object



83
84
85
86
87
88
89
90
91
# File 'lib/logstash/inputs/mongodb.rb', line 83

def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
  @logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}")
  since = sqlitedb[SINCE_TABLE]
  mongo_collection = mongodb.collection(mongo_collection_name)
  first_entry = mongo_collection.find({}, :sort => ['_id', Mongo::ASCENDING], :limit => 1).first
  first_entry_id = first_entry['_id'].to_s
  since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id)
  return first_entry_id
end

#init_placeholder_table(sqlitedb) ⇒ Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/logstash/inputs/mongodb.rb', line 71

def init_placeholder_table(sqlitedb)
  begin
    sqlitedb.create_table "#{SINCE_TABLE}" do
      String :table
      Int :place
    end
  rescue
    @logger.debug("since table already exists")
  end
end

#registerObject



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/logstash/inputs/mongodb.rb', line 154

def register
  require "mongo"
  require "jdbc/sqlite3"
  require "sequel"
  uriParsed = Mongo::URIParser.new(@uri)
  conn = uriParsed.connection({})
  if uriParsed.auths.length > 0
    uriParsed.auths.each do |auth|
      if !auth['db_name'].nil?
        conn.add_auth(auth['db_name'], auth['username'], auth['password'], nil)
      end
    end
    conn.apply_saved_authentication()
  end
  @host = Socket.gethostname
  @logger.info("Registering MongoDB input", :database => @path)
  #@mongodb = conn.db(@database)
  @mongodb = conn.db(uriParsed.db_name)
  @sqlitedb = Sequel.connect("jdbc:sqlite:#{@path}")
  # Should check to see if there are new matching tables at a predefined interval or on some trigger
  @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)
end

#run(queue) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/logstash/inputs/mongodb.rb', line 215

def run(queue)
  sleep_min = 0.01
  sleep_max = 5
  sleeptime = sleep_min

  begin
    @logger.debug("Tailing MongoDB", :path => @path)
    @logger.debug("Collection data is: #{@collection_data}")
    loop do
      @collection_data.each do |index, collection|
        collection_name = collection[:name]
        @logger.debug("collection_data is: #{@collection_data}")
        last_id = @collection_data[index][:last_id]
        #@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name)
        # get batch of events starting at the last_place if it is set
        last_id_object = BSON::ObjectId(last_id)
        cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)
        cursor.each do |doc|
          logdate = DateTime.parse(doc['_id'].generation_time.to_s)
          event = LogStash::Event.new("host" => @host)
          decorate(event)
          event["logdate"] = logdate.iso8601
          log_entry = doc.to_h.to_s
          log_entry['_id'] = log_entry['_id'].to_s
          event["log_entry"] = log_entry
          event["mongo_id"] = doc['_id'].to_s
          @logger.debug("mongo_id: "+doc['_id'].to_s)
          #@logger.debug("EVENT looks like: "+event.to_s)
          #@logger.debug("Sent message: "+doc.to_h.to_s)
          #@logger.debug("EVENT looks like: "+event.to_s)
          # Extract the HOST_ID and PID from the MongoDB BSON::ObjectID
          if @unpack_mongo_id
            doc_obj_bin = doc['_id'].to_a.pack("C*").unpack("a4 a3 a2 a3")
            host_id = doc_obj_bin[1].unpack("S")
            process_id = doc_obj_bin[2].unpack("S")
            event['host_id'] = host_id.first.to_i
            event['process_id'] = process_id.first.to_i
          end

          if @parse_method == 'flatten'
            # Flatten the JSON so that the data is usable in Kibana
            flat_doc = flatten(doc)
            # Check for different types of expected values and add them to the event
            if flat_doc['info_message'] && (flat_doc['info_message']  =~ /collection stats: .+/)
              # Some custom stuff I'm having to do to fix formatting in past logs...
              sub_value = flat_doc['info_message'].sub("collection stats: ", "")
              JSON.parse(sub_value).each do |k1,v1|
                flat_doc["collection_stats_#{k1.to_s}"] = v1
              end
            end

            flat_doc.each do |k,v|
              # Check for an integer
              if /\A[-+]?\d+[.][\d]+\z/ === v
                event[k.to_s] = v.to_f
              elsif (/\A[-+]?\d+\z/ === v) || (v.is_a? Integer)
                event[k.to_s] = v.to_i
              else
                event[k.to_s] = v.to_s unless k.to_s == "_id" || k.to_s == "tags"
                if (k.to_s == "tags") && (v.is_a? Array)
                  event['tags'] = v
                end
              end
            end
          elsif @parse_method == 'dig'
            # Dig into the JSON and flatten select elements
            doc.each do |k, v|
              if k != "_id"
                if (@dig_fields.include? k) && (v.respond_to? :each)
                  v.each do |kk, vv|
                    if (@dig_dig_fields.include? kk) && (vv.respond_to? :each)
                      vv.each do |kkk, vvv|
                        if /\A[-+]?\d+\z/ === vvv
                          event["#{k}_#{kk}_#{kkk}"] = vvv.to_i
                        else
                          event["#{k}_#{kk}_#{kkk}"] = vvv.to_s
                        end
                      end
                    else
                      if /\A[-+]?\d+\z/ === vv
                        event["#{k}_#{kk}"] = vv.to_i
                      else
                        event["#{k}_#{kk}"] = vv.to_s
                      end
                    end
                  end
                else
                  if /\A[-+]?\d+\z/ === v
                    event[k] = v.to_i
                  else
                    event[k] = v.to_s
                  end
                end
              end
            end
          else
            # Should probably do some sanitization here and insert the doc as raw as possible for parsing in logstash
          end

          queue << event
          @collection_data[index][:last_id] = doc['_id'].to_s
        end
        # Store the last-seen doc in the database
        update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
      end
      @logger.debug("Updating watch collections")
      @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)

      # nothing found in that iteration
      # sleep a bit
      @logger.debug("No new rows. Sleeping.", :time => sleeptime)
      sleeptime = [sleeptime * 2, sleep_max].min
      sleep(sleeptime)
      #sleeptime = sleep_min
    end
  rescue LogStash::ShutdownSignal
    if @interrupted
      @logger.debug("Mongo Input shutting down")
    end
  end
end

#update_placeholder(sqlitedb, since_table, mongo_collection_name, place) ⇒ Object



108
109
110
111
112
# File 'lib/logstash/inputs/mongodb.rb', line 108

def update_placeholder(sqlitedb, since_table, mongo_collection_name, place)
  #@logger.debug("updating placeholder for #{since_table}_#{mongo_collection_name} to #{place}")
  since = sqlitedb[SINCE_TABLE]
  since.where(:table => "#{since_table}_#{mongo_collection_name}").update(:place => place)
end

#update_watched_collections(mongodb, collection, sqlitedb) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/logstash/inputs/mongodb.rb', line 140

def update_watched_collections(mongodb, collection, sqlitedb)
  collections = get_collection_names(mongodb, collection)
  collection_data = {}
  collections.each do |my_collection|
    init_placeholder_table(sqlitedb)
    last_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection)
    if !collection_data[my_collection]
      collection_data[my_collection] = { :name => my_collection, :last_id => last_id }
    end
  end
  return collection_data
end