Class: Fluent::MongoTailInput
Instance Method Summary
collapse
#configure_logger, included
Methods included from MongoAuth
#authenticate
included
Constructor Details
Returns a new instance of MongoTailInput.
51
52
53
54
55
56
57
58
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 51
def initialize
super
require 'mongo'
require 'bson'
@client_options = {}
@connection_options = {}
end
|
Instance Method Details
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 60
def configure(conf)
super
if !@tag and !@tag_key
raise ConfigError, "'tag' or 'tag_key' option is required on mongo_tail input"
end
if @database && @url
raise ConfigError, "Both 'database' and 'url' can not be set"
end
if !@database && !@url
raise ConfigError, "One of 'database' or 'url' must be specified"
end
@last_id = @id_store_file ? get_last_id : nil
@connection_options[:ssl] = @ssl
configure_logger(@mongo_log_level)
end
|
#run ⇒ Object
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 105
def run
loop {
option = {}
begin
loop {
return if @stop
option['_id'] = {'$gt' => BSON::ObjectId(@last_id)} if @last_id
documents = @collection.find(option)
if documents.count >= 1
process_documents(documents)
else
sleep @wait_time
end
}
rescue
end
}
end
|
#shutdown ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 92
def shutdown
if @id_store_file
save_last_id
@file.close
end
@stop = true
@thread.join
@client.close
super
end
|
#start ⇒ Object
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 81
def start
super
@file = get_id_store_file if @id_store_file
@collection = get_collection
@last_id = get_last_inserted_id if !@id_store_file and get_last_inserted_id
@thread = Thread.new(&method(:run))
end
|