Class: Fluent::MongoOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::MongoOutput
- Includes:
- MongoUtil, SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_mongo.rb
Direct Known Subclasses
Constant Summary collapse
- LIMIT_BEFORE_v1_8 =
Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
2 * 1024 * 1024
- LIMIT_AFTER_v1_8 =
2MB = 4MB / 2
8 * 1024 * 1024
Instance Attribute Summary collapse
-
#collection_options ⇒ Object
readonly
Returns the value of attribute collection_options.
-
#connection_options ⇒ Object
readonly
Returns the value of attribute connection_options.
Class Method Summary collapse
-
.format_nocache(time) ⇒ Object
MongoDB uses BSON’s Date for time.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
8MB = 16MB / 2.
- #emit(tag, es, chain) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ MongoOutput
constructor
A new instance of MongoOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Methods included from MongoUtil
Constructor Details
#initialize ⇒ MongoOutput
Returns a new instance of MongoOutput.
51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/out_mongo.rb', line 51 def initialize super require 'mongo' require 'msgpack' @clients = {} @connection_options = {} @collection_options = {:capped => false} end |
Instance Attribute Details
#collection_options ⇒ Object (readonly)
Returns the value of attribute collection_options.
45 46 47 |
# File 'lib/fluent/plugin/out_mongo.rb', line 45 def @collection_options end |
#connection_options ⇒ Object (readonly)
Returns the value of attribute connection_options.
45 46 47 |
# File 'lib/fluent/plugin/out_mongo.rb', line 45 def @connection_options end |
Class Method Details
.format_nocache(time) ⇒ Object
MongoDB uses BSON’s Date for time.
128 129 130 |
# File 'lib/fluent/plugin/out_mongo.rb', line 128 def @timef.format_nocache(time) time end |
Instance Method Details
#configure(conf) ⇒ Object
8MB = 16MB / 2
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/fluent/plugin/out_mongo.rb', line 65 def configure(conf) if conf.has_key?('buffer_chunk_limit') configured_chunk_limit_size = Config.size_value(conf['buffer_chunk_limit']) estimated_limit_size = LIMIT_AFTER_v1_8 estimated_limit_size_conf = '8m' if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) estimated_limit_size = LIMIT_BEFORE_v1_8 estimated_limit_size_conf = '2m' end if configured_chunk_limit_size > estimated_limit_size log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}" conf['buffer_chunk_limit'] = estimated_limit_size_conf end else if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) conf['buffer_chunk_limit'] = '2m' else conf['buffer_chunk_limit'] = '8m' end end super unless @ignore_invalid_record log.warn "Since v0.8, invalid record detection will be removed because mongo driver v2.x and API spec don't provide it. You may lose invalid records, so you should not send such records to mongo plugin" end if conf.has_key?('tag_mapped') @tag_mapped = true @disable_collection_check = true if @disable_collection_check.nil? else @disable_collection_check = false if @disable_collection_check.nil? end raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection') if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields if conf.has_key?('capped') raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') @collection_options[:capped] = true @collection_options[:size] = Config.size_value(conf['capped_size']) @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') end @connection_options[:w] = @write_concern unless @write_concern.nil? @connection_options[:j] = @journaled @connection_options[:pool_size] = @socket_pool_size @connection_options[:ssl] = @ssl if @ssl @connection_options[:ssl_cert] = @ssl_cert @connection_options[:ssl_key] = @ssl_key @connection_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase @connection_options[:ssl_verify] = @ssl_verify @connection_options[:ssl_ca_cert] = @ssl_ca_cert end # MongoDB uses BSON's Date for time. def @timef.format_nocache(time) time end $log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}" end |
#emit(tag, es, chain) ⇒ Object
152 153 154 155 156 157 158 159 |
# File 'lib/fluent/plugin/out_mongo.rb', line 152 def emit(tag, es, chain) # TODO: Should replacement using eval in configure? if @tag_mapped super(tag, es, chain, tag) else super(tag, es, chain) end end |
#format(tag, time, record) ⇒ Object
148 149 150 |
# File 'lib/fluent/plugin/out_mongo.rb', line 148 def format(tag, time, record) [time, record].to_msgpack end |
#shutdown ⇒ Object
142 143 144 145 146 |
# File 'lib/fluent/plugin/out_mongo.rb', line 142 def shutdown # Mongo::Connection checks alive or closed myself @clients.values.each { |client| client.db.connection.close } super end |
#start ⇒ Object
135 136 137 138 139 140 |
# File 'lib/fluent/plugin/out_mongo.rb', line 135 def start # Non tag mapped mode, we can check collection configuration before server start. get_or_create_collection(@collection) unless @tag_mapped super end |
#write(chunk) ⇒ Object
161 162 163 164 165 |
# File 'lib/fluent/plugin/out_mongo.rb', line 161 def write(chunk) # TODO: See emit comment collection_name = @tag_mapped ? chunk.key : @collection operate(get_or_create_collection(collection_name), collect_records(chunk)) end |