Class: Fluent::Plugin::MongoTailInput
Instance Method Summary
collapse
#configure_logger, included
Methods included from MongoAuth
#authenticate
included
Constructor Details
Returns a new instance of MongoTailInput.
50
51
52
53
54
55
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 50
def initialize
super
@client_options = {}
@connection_options = {}
end
|
Instance Method Details
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 57
def configure(conf)
super
if !@tag and !@tag_key
raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on mongo_tail input"
end
if @database && @url
raise Fluent::ConfigError, "Both 'database' and 'url' can not be set"
end
if !@database && !@url
raise Fluent::ConfigError, "One of 'database' or 'url' must be specified"
end
@last_id = @id_store_file ? get_last_id : nil
@connection_options[:ssl] = @ssl
if @batch_size && @batch_size <= 0
raise Fluent::ConfigError, "Batch size must be positive."
end
configure_logger(@mongo_log_level)
end
|
#run ⇒ Object
104
105
106
107
108
109
110
111
112
113
114
115
116
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 104
def run
option = {}
begin
option['_id'] = {'$gt' => BSON::ObjectId(@last_id)} if @last_id
documents = @collection.find(option)
documents = documents.limit(@batch_size) if @batch_size
if documents.count >= 1
process_documents(documents)
end
rescue
end
end
|
#shutdown ⇒ Object
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 93
def shutdown
if @id_store_file
save_last_id
@file.close
end
@client.close
super
end
|
#start ⇒ Object
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 82
def start
super
@file = get_id_store_file if @id_store_file
@collection = get_collection
@last_id = get_last_inserted_id if !@id_store_file and get_last_inserted_id
timer_execute(:in_mongo_tail_watcher, @wait_time, &method(:run))
end
|