Class: Fluent::MongoTailInput

Inherits:
Input
  • Object
show all
Includes:
LoggerSupport, MongoAuth, MongoAuthParams
Defined in:
lib/fluent/plugin/in_mongo_tail.rb

Instance Method Summary collapse

Methods included from LoggerSupport

#configure_logger, included

Methods included from MongoAuth

#authenticate

Methods included from MongoAuthParams

included

Constructor Details

#initializeMongoTailInput

Returns a new instance of MongoTailInput.



51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 51

def initialize
  super
  require 'mongo'
  require 'bson'

  @client_options = {}
  @connection_options = {}
end

Instance Method Details

#configure(conf) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 60

def configure(conf)
  super

  if !@tag and !@tag_key
    raise ConfigError, "'tag' or 'tag_key' option is required on mongo_tail input"
  end

  if @database && @url
    raise ConfigError, "Both 'database' and 'url' can not be set"
  end

  if !@database && !@url
    raise ConfigError, "One of 'database' or 'url' must be specified"
  end

  @last_id = @id_store_file ? get_last_id : nil
  @connection_options[:ssl] = @ssl

  configure_logger(@mongo_log_level)
end

#runObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 105

def run
  loop {
    option = {}
    begin
      loop {
        return if @stop

        option['_id'] = {'$gt' => BSON::ObjectId(@last_id)} if @last_id
        documents = @collection.find(option)
        if documents.count >= 1
          process_documents(documents)
        else
          sleep @wait_time
        end
      }
    rescue
      # ignore Exceptions
    end
  }
end

#shutdownObject



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 92

def shutdown
  if @id_store_file
    save_last_id
    @file.close
  end

  @stop = true
  @thread.join
  @client.close

  super
end

#startObject



81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 81

def start
  super

  @file = get_id_store_file if @id_store_file
  @collection = get_collection
  # Resume tailing from last inserted id.
  # Because tailable option is obsoleted since mongo driver 2.0.
  @last_id = get_last_inserted_id if !@id_store_file and get_last_inserted_id
  @thread = Thread.new(&method(:run))
end