Class: Fluent::MongoOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
MongoUtil, SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_mongo.rb

Direct Known Subclasses

MongoOutputReplset, MongoOutputTagCollection

Constant Summary collapse

LIMIT_BEFORE_v1_8 =

Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.

2 * 1024 * 1024
LIMIT_AFTER_v1_8 =

2MB = 4MB / 2

8 * 1024 * 1024

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from MongoUtil

#authenticate, included

Constructor Details

#initializeMongoOutput

Returns a new instance of MongoOutput.



50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/out_mongo.rb', line 50

def initialize
  super
  require 'mongo'
  require 'msgpack'

  @clients = {}
  @connection_options = {}
  @collection_options = {:capped => false}
end

Instance Attribute Details

#collection_optionsObject (readonly)

Returns the value of attribute collection_options.



44
45
46
# File 'lib/fluent/plugin/out_mongo.rb', line 44

def collection_options
  @collection_options
end

#connection_optionsObject (readonly)

Returns the value of attribute connection_options.



44
45
46
# File 'lib/fluent/plugin/out_mongo.rb', line 44

def connection_options
  @connection_options
end

Class Method Details

.format_nocache(time) ⇒ Object

MongoDB uses BSON’s Date for time.



123
124
125
# File 'lib/fluent/plugin/out_mongo.rb', line 123

def @timef.format_nocache(time)
  time
end

Instance Method Details

#configure(conf) ⇒ Object

8MB = 16MB / 2

Raises:

  • (ConfigError)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/fluent/plugin/out_mongo.rb', line 64

def configure(conf)
  if conf.has_key?('buffer_chunk_limit')
    configured_chunk_limit_size = Config.size_value(conf['buffer_chunk_limit'])
    estimated_limit_size = LIMIT_AFTER_v1_8
    estimated_limit_size_conf = '8m'
    if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit'])
      estimated_limit_size = LIMIT_BEFORE_v1_8
      estimated_limit_size_conf = '2m'
    end
    if configured_chunk_limit_size > estimated_limit_size
      log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}"
      conf['buffer_chunk_limit'] = estimated_limit_size_conf
    end
  else
    if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit'])
      conf['buffer_chunk_limit'] = '2m'
    else
      conf['buffer_chunk_limit'] = '8m'
    end
  end

  super

  if conf.has_key?('tag_mapped')
    @tag_mapped = true
    @disable_collection_check = true if @disable_collection_check.nil?
  else
    @disable_collection_check = false if @disable_collection_check.nil?
  end
  raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection')

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields

  if conf.has_key?('capped')
    raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size')
    @collection_options[:capped] = true
    @collection_options[:size] = Config.size_value(conf['capped_size'])
    @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
  end

  @connection_options[:w] = @write_concern unless @write_concern.nil?
  @connection_options[:j] = @journaled
  @connection_options[:pool_size] = @socket_pool_size

  @connection_options[:ssl] = @ssl

  if @ssl
   @connection_options[:ssl_cert] = @ssl_cert
   @connection_options[:ssl_key] = @ssl_key
   @connection_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase
   @connection_options[:ssl_verify] = @ssl_verify
   @connection_options[:ssl_ca_cert] = @ssl_ca_cert
  end

  # MongoDB uses BSON's Date for time.
  def @timef.format_nocache(time)
    time
  end

  $log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}"
end

#emit(tag, es, chain) ⇒ Object



147
148
149
150
151
152
153
154
# File 'lib/fluent/plugin/out_mongo.rb', line 147

def emit(tag, es, chain)
  # TODO: Should replacement using eval in configure?
  if @tag_mapped
    super(tag, es, chain, tag)
  else
    super(tag, es, chain)
  end
end

#format(tag, time, record) ⇒ Object



143
144
145
# File 'lib/fluent/plugin/out_mongo.rb', line 143

def format(tag, time, record)
  [time, record].to_msgpack
end

#shutdownObject



137
138
139
140
141
# File 'lib/fluent/plugin/out_mongo.rb', line 137

def shutdown
  # Mongo::Connection checks alive or closed myself
  @clients.values.each { |client| client.db.connection.close }
  super
end

#startObject



130
131
132
133
134
135
# File 'lib/fluent/plugin/out_mongo.rb', line 130

def start
  # Non tag mapped mode, we can check collection configuration before server start.
  get_or_create_collection(@collection) unless @tag_mapped

  super
end

#write(chunk) ⇒ Object



156
157
158
159
160
# File 'lib/fluent/plugin/out_mongo.rb', line 156

def write(chunk)
  # TODO: See emit comment
  collection_name = @tag_mapped ? chunk.key : @collection
  operate(get_or_create_collection(collection_name), collect_records(chunk))
end