Class: Fluent::MongoOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
MongoUtil, SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_mongo.rb

Direct Known Subclasses

MongoOutputReplset, MongoOutputTagCollection

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from MongoUtil

#authenticate, included

Constructor Details

#initializeMongoOutput

Returns a new instance of MongoOutput.



33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_mongo.rb', line 33

def initialize
  super
  require 'mongo'
  require 'msgpack'

  @clients = {}
  @connection_options = {}
  @collection_options = {:capped => false}
end

Instance Attribute Details

#collection_optionsObject (readonly)

Returns the value of attribute collection_options.



27
28
29
# File 'lib/fluent/plugin/out_mongo.rb', line 27

def collection_options
  @collection_options
end

#connection_optionsObject (readonly)

Returns the value of attribute connection_options.



27
28
29
# File 'lib/fluent/plugin/out_mongo.rb', line 27

def connection_options
  @connection_options
end

Class Method Details

.format_nocache(time) ⇒ Object

MongoDB uses BSON’s Date for time.



70
71
72
# File 'lib/fluent/plugin/out_mongo.rb', line 70

def @timef.format_nocache(time)
  time
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/out_mongo.rb', line 43

def configure(conf)
  super

  if conf.has_key?('tag_mapped')
    @tag_mapped = true
    @disable_collection_check = true if @disable_collection_check.nil?
  else
    @disable_collection_check = false if @disable_collection_check.nil?
  end
  raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection')

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields

  if conf.has_key?('capped')
    raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size')
    @collection_options[:capped] = true
    @collection_options[:size] = Config.size_value(conf['capped_size'])
    @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
  end

  @connection_options[:w] = @write_concern unless @write_concern.nil?

  # MongoDB uses BSON's Date for time.
  def @timef.format_nocache(time)
    time
  end

  $log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}"
end

#emit(tag, es, chain) ⇒ Object



97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/out_mongo.rb', line 97

def emit(tag, es, chain)
  # TODO: Should replacement using eval in configure?
  if @tag_mapped
    super(tag, es, chain, tag)
  else
    super(tag, es, chain)
  end
end

#format(tag, time, record) ⇒ Object



93
94
95
# File 'lib/fluent/plugin/out_mongo.rb', line 93

def format(tag, time, record)
  [time, record].to_msgpack
end

#shutdownObject



87
88
89
90
91
# File 'lib/fluent/plugin/out_mongo.rb', line 87

def shutdown
  # Mongo::Connection checks alive or closed myself
  @clients.values.each { |client| client.db.connection.close }
  super
end

#startObject



77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/out_mongo.rb', line 77

def start
  # Non tag mapped mode, we can check collection configuration before server start.
  get_or_create_collection(@collection) unless @tag_mapped

  # From configure for avoding complex method dependency...
  @buffer.buffer_chunk_limit = available_buffer_chunk_limit

  super
end

#write(chunk) ⇒ Object



106
107
108
109
110
# File 'lib/fluent/plugin/out_mongo.rb', line 106

def write(chunk)
  # TODO: See emit comment
  collection_name = @tag_mapped ? chunk.key : @collection
  operate(get_or_create_collection(collection_name), collect_records(chunk))
end