Class: Fluent::MongoOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::MongoOutput
- Includes:
- LoggerSupport, MongoAuth, MongoAuthParams, SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_mongo.rb
Direct Known Subclasses
Constant Summary collapse
- LIMIT_BEFORE_v1_8 =
Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
2 * 1024 * 1024
- LIMIT_AFTER_v1_8 =
2MB = 4MB / 2
8 * 1024 * 1024
Instance Attribute Summary collapse
-
#client_options ⇒ Object
readonly
Returns the value of attribute client_options.
-
#collection_options ⇒ Object
readonly
Returns the value of attribute collection_options.
Class Method Summary collapse
-
.format_nocache(time) ⇒ Object
MongoDB uses BSON’s Date for time.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
8MB = 16MB / 2.
- #emit(tag, es, chain) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ MongoOutput
constructor
A new instance of MongoOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Methods included from LoggerSupport
Methods included from MongoAuth
Methods included from MongoAuthParams
Constructor Details
#initialize ⇒ MongoOutput
Returns a new instance of MongoOutput.
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/fluent/plugin/out_mongo.rb', line 58 def initialize super require 'mongo' require 'msgpack' @nodes = nil = {} = {capped: false} end |
Instance Attribute Details
#client_options ⇒ Object (readonly)
Returns the value of attribute client_options.
56 57 58 |
# File 'lib/fluent/plugin/out_mongo.rb', line 56 def end |
#collection_options ⇒ Object (readonly)
Returns the value of attribute collection_options.
56 57 58 |
# File 'lib/fluent/plugin/out_mongo.rb', line 56 def end |
Class Method Details
.format_nocache(time) ⇒ Object
MongoDB uses BSON’s Date for time.
134 135 136 |
# File 'lib/fluent/plugin/out_mongo.rb', line 134 def @timef.format_nocache(time) time end |
Instance Method Details
#configure(conf) ⇒ Object
8MB = 16MB / 2
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/fluent/plugin/out_mongo.rb', line 73 def configure(conf) if conf.has_key?('buffer_chunk_limit') configured_chunk_limit_size = Config.size_value(conf['buffer_chunk_limit']) estimated_limit_size = LIMIT_AFTER_v1_8 estimated_limit_size_conf = '8m' if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) estimated_limit_size = LIMIT_BEFORE_v1_8 estimated_limit_size_conf = '2m' end if configured_chunk_limit_size > estimated_limit_size log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}" conf['buffer_chunk_limit'] = estimated_limit_size_conf end else if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) conf['buffer_chunk_limit'] = '2m' else conf['buffer_chunk_limit'] = '8m' end end super if @connection_string.nil? && @database.nil? raise Fluent::ConfigError, "connection_string or database parameter is required" end unless @ignore_invalid_record log.warn "Since v0.8, invalid record detection will be removed because mongo driver v2.x and API spec don't provide it. You may lose invalid records, so you should not send such records to mongo plugin" end if conf.has_key?('tag_mapped') @tag_mapped = true end raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection') if conf.has_key?('capped') raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') [:capped] = true [:size] = Config.size_value(conf['capped_size']) [:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') end if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end [:write] = {j: @journaled} [:write].merge!({w: @write_concern}) unless @write_concern.nil? [:ssl] = @ssl if @ssl [:ssl_cert] = @ssl_cert [:ssl_key] = @ssl_key [:ssl_key_pass_phrase] = @ssl_key_pass_phrase [:ssl_verify] = @ssl_verify [:ssl_ca_cert] = @ssl_ca_cert end @nodes = ["#{@host}:#{@port}"] if @nodes.nil? # MongoDB uses BSON's Date for time. def @timef.format_nocache(time) time end configure_logger(@mongo_log_level) log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}" end |
#emit(tag, es, chain) ⇒ Object
154 155 156 157 158 159 160 |
# File 'lib/fluent/plugin/out_mongo.rb', line 154 def emit(tag, es, chain) if @tag_mapped super(tag, es, chain, tag) else super(tag, es, chain) end end |
#format(tag, time, record) ⇒ Object
162 163 164 |
# File 'lib/fluent/plugin/out_mongo.rb', line 162 def format(tag, time, record) [time, record].to_msgpack end |
#shutdown ⇒ Object
149 150 151 152 |
# File 'lib/fluent/plugin/out_mongo.rb', line 149 def shutdown @client.close super end |
#start ⇒ Object
143 144 145 146 147 |
# File 'lib/fluent/plugin/out_mongo.rb', line 143 def start @client = client @client = authenticate(@client) super end |
#write(chunk) ⇒ Object
166 167 168 169 |
# File 'lib/fluent/plugin/out_mongo.rb', line 166 def write(chunk) collection_name = @tag_mapped ? chunk.key : @collection operate(format_collection_name(collection_name), collect_records(chunk)) end |