Class: Fluent::DocumentdbOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_documentdb.rb

Instance Method Summary collapse

Constructor Details

#initializeDocumentdbOutput

Returns a new instance of DocumentdbOutput.



7
8
9
10
11
12
13
# File 'lib/fluent/plugin/out_documentdb.rb', line 7

def initialize
    super
    require 'documentdb'
    require 'msgpack'
    require 'time'
    require 'securerandom'
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/out_documentdb.rb', line 28

def configure(conf)
    super
    raise ConfigError, 'no docdb_endpoint' if @docdb_endpoint.empty?
    raise ConfigError, 'no docdb_account_key' if @docdb_account_key.empty?
    raise ConfigError, 'no docdb_database' if @docdb_database.empty?
    raise ConfigError, 'no docdb_collection' if @docdb_collection.empty?
    if @add_time_field and @time_field_name.empty?
        raise ConfigError, 'time_field_name is needed if add_time_field is true'
    end
    if @add_tag_field and @tag_field_name.empty?
        raise ConfigError, 'tag_field_name is needed if add_tag_field is true'
    end

    @timef = TimeFormatter.new(@time_format, @localtime)
end

#format(tag, time, record) ⇒ Object



87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_documentdb.rb', line 87

def format(tag, time, record)
    record['id'] =  SecureRandom.uuid
    if @add_time_field
        record[@time_field_name] = @timef.format(time)
    end
    if @add_tag_field
        record[@tag_field_name] = tag
    end 
    record.to_msgpack
end

#shutdownObject



82
83
84
85
# File 'lib/fluent/plugin/out_documentdb.rb', line 82

def shutdown
    super
    # destroy
end

#startObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_documentdb.rb', line 44

def start
    super

    begin
        context = Azure::DocumentDB::Context.new @docdb_endpoint, @docdb_account_key

        ## initial operations for database
        database = Azure::DocumentDB::Database.new context, RestClient
        qreq = Azure::DocumentDB::QueryRequest.new "SELECT * FROM root r WHERE r.id=@id"
        qreq.parameters.add "@id", @docdb_database
        query = database.query
        qres = query.execute qreq
        if( qres[:body]["_count"].to_i == 0 )
            raise "No database (#{docdb_database}) exists! Enable auto_create_database or create it by useself" if !@auto_create_database 
            # create new database as it doesn't exists
            database.create @docdb_database
        end

        ## initial operations for collection
        collection = database.collection_for_name @docdb_database
        qreq = Azure::DocumentDB::QueryRequest.new "SELECT * FROM root r WHERE r.id=@id"
        qreq.parameters.add "@id", @docdb_collection
        query = collection.query
        qres = query.execute qreq
        if( qres[:body]["_count"].to_i == 0 )
            raise "No collection (#{docdb_collection}) exists! Enable auto_create_collection or create it by useself" if !@auto_create_collection
            # create new collection as it doesn't exists
            collection.create @docdb_collection
        end
    
        @docdb = collection.document_for_name @docdb_collection

    rescue Exception =>ex
        $log.fatal "Error: '#{ex}'"
        exit!
    end
end

#write(chunk) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/out_documentdb.rb', line 98

def write(chunk)
    records = []
    chunk.msgpack_each { |record|
        unique_doc_identifier = record["id"]
        docdata = record.to_json
        begin
            @docdb.create unique_doc_identifier, docdata
        rescue Exception => ex
            $log.fatal "UnknownError: '#{ex}'" 
                        + ", uniqueid=>#{unique_doc_identifier}, data=>"
                        + docdata.to_s
        end
    }
end