Class: Fluent::MongoOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::MongoOutput
- Includes:
- MongoUtil, SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_mongo.rb
Direct Known Subclasses
Constant Summary collapse
- LIMIT_BEFORE_v1_8 =
Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
2 * 1024 * 1024
- LIMIT_AFTER_v1_8 =
2MB = 4MB / 2
8 * 1024 * 1024
Instance Attribute Summary collapse
-
#collection_options ⇒ Object
readonly
Returns the value of attribute collection_options.
-
#connection_options ⇒ Object
readonly
Returns the value of attribute connection_options.
Class Method Summary collapse
-
.format_nocache(time) ⇒ Object
MongoDB uses BSON’s Date for time.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
8MB = 16MB / 2.
- #emit(tag, es, chain) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ MongoOutput
constructor
A new instance of MongoOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Methods included from MongoUtil
Constructor Details
#initialize ⇒ MongoOutput
Returns a new instance of MongoOutput.
50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/out_mongo.rb', line 50 def initialize super require 'mongo' require 'msgpack' @clients = {} @connection_options = {} @collection_options = {:capped => false} end |
Instance Attribute Details
#collection_options ⇒ Object (readonly)
Returns the value of attribute collection_options.
44 45 46 |
# File 'lib/fluent/plugin/out_mongo.rb', line 44 def @collection_options end |
#connection_options ⇒ Object (readonly)
Returns the value of attribute connection_options.
44 45 46 |
# File 'lib/fluent/plugin/out_mongo.rb', line 44 def @connection_options end |
Class Method Details
.format_nocache(time) ⇒ Object
MongoDB uses BSON’s Date for time.
123 124 125 |
# File 'lib/fluent/plugin/out_mongo.rb', line 123 def @timef.format_nocache(time) time end |
Instance Method Details
#configure(conf) ⇒ Object
8MB = 16MB / 2
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/fluent/plugin/out_mongo.rb', line 64 def configure(conf) if conf.has_key?('buffer_chunk_limit') configured_chunk_limit_size = Config.size_value(conf['buffer_chunk_limit']) estimated_limit_size = LIMIT_AFTER_v1_8 estimated_limit_size_conf = '8m' if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) estimated_limit_size = LIMIT_BEFORE_v1_8 estimated_limit_size_conf = '2m' end if configured_chunk_limit_size > estimated_limit_size log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}" conf['buffer_chunk_limit'] = estimated_limit_size_conf end else if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) conf['buffer_chunk_limit'] = '2m' else conf['buffer_chunk_limit'] = '8m' end end super if conf.has_key?('tag_mapped') @tag_mapped = true @disable_collection_check = true if @disable_collection_check.nil? else @disable_collection_check = false if @disable_collection_check.nil? end raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection') if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields if conf.has_key?('capped') raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') @collection_options[:capped] = true @collection_options[:size] = Config.size_value(conf['capped_size']) @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') end @connection_options[:w] = @write_concern unless @write_concern.nil? @connection_options[:j] = @journaled @connection_options[:pool_size] = @socket_pool_size @connection_options[:ssl] = @ssl if @ssl @connection_options[:ssl_cert] = @ssl_cert @connection_options[:ssl_key] = @ssl_key @connection_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase @connection_options[:ssl_verify] = @ssl_verify @connection_options[:ssl_ca_cert] = @ssl_ca_cert end # MongoDB uses BSON's Date for time. def @timef.format_nocache(time) time end $log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}" end |
#emit(tag, es, chain) ⇒ Object
147 148 149 150 151 152 153 154 |
# File 'lib/fluent/plugin/out_mongo.rb', line 147 def emit(tag, es, chain) # TODO: Should replacement using eval in configure? if @tag_mapped super(tag, es, chain, tag) else super(tag, es, chain) end end |
#format(tag, time, record) ⇒ Object
143 144 145 |
# File 'lib/fluent/plugin/out_mongo.rb', line 143 def format(tag, time, record) [time, record].to_msgpack end |
#shutdown ⇒ Object
137 138 139 140 141 |
# File 'lib/fluent/plugin/out_mongo.rb', line 137 def shutdown # Mongo::Connection checks alive or closed myself @clients.values.each { |client| client.db.connection.close } super end |
#start ⇒ Object
130 131 132 133 134 135 |
# File 'lib/fluent/plugin/out_mongo.rb', line 130 def start # Non tag mapped mode, we can check collection configuration before server start. get_or_create_collection(@collection) unless @tag_mapped super end |
#write(chunk) ⇒ Object
156 157 158 159 160 |
# File 'lib/fluent/plugin/out_mongo.rb', line 156 def write(chunk) # TODO: See emit comment collection_name = @tag_mapped ? chunk.key : @collection operate(get_or_create_collection(collection_name), collect_records(chunk)) end |