Class: Fluent::Plugin::MongoTailInput

Inherits:
Input
  • Object
show all
Includes:
LoggerSupport, MongoAuth, MongoAuthParams
Defined in:
lib/fluent/plugin/in_mongo_tail.rb

Instance Method Summary collapse

Methods included from LoggerSupport

#configure_logger, included

Methods included from MongoAuth

#authenticate

Methods included from MongoAuthParams

included

Constructor Details

#initializeMongoTailInput

Returns a new instance of MongoTailInput.



50
51
52
53
54
55
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 50

def initialize
  super

  @client_options = {}
  @connection_options = {}
end

Instance Method Details

#configure(conf) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 57

def configure(conf)
  super

  if !@tag and !@tag_key
    raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on mongo_tail input"
  end

  if @database && @url
    raise Fluent::ConfigError, "Both 'database' and 'url' can not be set"
  end

  if !@database && !@url
    raise Fluent::ConfigError, "One of 'database' or 'url' must be specified"
  end

  @last_id = @id_store_file ? get_last_id : nil
  @connection_options[:ssl] = @ssl

  if @batch_size && @batch_size <= 0
    raise Fluent::ConfigError, "Batch size must be positive."
  end

  configure_logger(@mongo_log_level)
end

#runObject



104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 104

def run
  option = {}
  begin
    option['_id'] = {'$gt' => BSON::ObjectId(@last_id)} if @last_id
    documents = @collection.find(option)
    documents = documents.limit(@batch_size) if @batch_size
    if documents.count >= 1
      process_documents(documents)
    end
  rescue
    # ignore Exceptions
  end
end

#shutdownObject



93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 93

def shutdown
  if @id_store_file
    save_last_id
    @file.close
  end

  @client.close

  super
end

#startObject



82
83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 82

def start
  super

  @file = get_id_store_file if @id_store_file
  @collection = get_collection
  # Resume tailing from last inserted id.
  # Because tailable option is obsoleted since mongo driver 2.0.
  @last_id = get_last_inserted_id if !@id_store_file and get_last_inserted_id
  timer_execute(:in_mongo_tail_watcher, @wait_time, &method(:run))
end