Class: Fluent::DocumentdbOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_documentdb.rb

Instance Method Summary collapse

Constructor Details

#initializeDocumentdbOutput

Returns a new instance of DocumentdbOutput.



10
11
12
13
14
15
16
17
18
19
# File 'lib/fluent/plugin/out_documentdb.rb', line 10

def initialize
  super
  require 'msgpack'
  require 'time'
  require 'securerandom'
  require 'fluent/plugin/documentdb/client'
  require 'fluent/plugin/documentdb/partitioned_coll_client'
  require 'fluent/plugin/documentdb/header'
  require 'fluent/plugin/documentdb/resource'
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/out_documentdb.rb', line 37

def configure(conf)
  super
  raise ConfigError, 'no docdb_endpoint' if @docdb_endpoint.empty?
  raise ConfigError, 'no docdb_account_key' if @docdb_account_key.empty?
  raise ConfigError, 'no docdb_database' if @docdb_database.empty?
  raise ConfigError, 'no docdb_collection' if @docdb_collection.empty?
  if @add_time_field and @time_field_name.empty?
    raise ConfigError, 'time_field_name must be set if add_time_field is true'
  end
  if @add_tag_field and @tag_field_name.empty?
    raise ConfigError, 'tag_field_name must be set if add_tag_field is true'
  end
  if @partitioned_collection
    raise ConfigError, 'partition_key must be set in partitioned collection mode' if @partition_key.empty?
    if (@auto_create_collection &&
          @offer_throughput < AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT)
      raise ConfigError, sprintf("offer_throughput must be more than and equals to %s",
                             AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT) 
    end
  end
  @timef = TimeFormatter.new(@time_format, @localtime)
end

#format(tag, time, record) ⇒ Object



107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/out_documentdb.rb', line 107

def format(tag, time, record)
  record['id'] =  SecureRandom.uuid
  if @add_time_field
    record[@time_field_name] = @timef.format(time)
  end
  if @add_tag_field
    record[@tag_field_name] = tag
  end 
  record.to_msgpack
end

#shutdownObject



102
103
104
105
# File 'lib/fluent/plugin/out_documentdb.rb', line 102

def shutdown
  super
  # destroy
end

#startObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/fluent/plugin/out_documentdb.rb', line 60

def start
  super

  begin

    @client = nil
    if @partitioned_collection
      @client = AzureDocumentDB::PartitionedCollectionClient.new(@docdb_account_key,@docdb_endpoint)
    else
      @client = AzureDocumentDB::Client.new(@docdb_account_key,@docdb_endpoint)
    end

    ## initial operations for database
    res = @client.find_databases_by_name(@docdb_database)
    if( res[:body]["_count"].to_i == 0 )
      raise "No database (#{docdb_database}) exists! Enable auto_create_database or create it by useself" if !@auto_create_database 
      # create new database as it doesn't exists
      @client.create_database(@docdb_database)
    end

    ## initial operations for collection
    database_resource = @client.get_database_resource(@docdb_database)
    res = @client.find_collections_by_name(database_resource, @docdb_collection)
    if( res[:body]["_count"].to_i == 0 )
      raise "No collection (#{docdb_collection}) exists! Enable auto_create_collection or create it by useself" if !@auto_create_collection
      # create new collection as it doesn't exists
      if @partitioned_collection
        partition_key_paths = ["/#{@partition_key}"]
        @client.create_collection(database_resource,
                    @docdb_collection, partition_key_paths, @offer_throughput)
      else
        @client.create_collection(database_resource, @docdb_collection)
      end
    end
    @coll_resource = @client.get_collection_resource(database_resource, @docdb_collection)

  rescue Exception =>ex
    $log.fatal "Error: '#{ex}'"
    exit!
  end
end

#write(chunk) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin/out_documentdb.rb', line 118

def write(chunk)
  chunk.msgpack_each { |record|
    unique_doc_identifier = record["id"]
    begin
      if @partitioned_collection
        @client.create_document(@coll_resource, unique_doc_identifier, record, @partition_key)
      else
        @client.create_document(@coll_resource, unique_doc_identifier, record)
      end
    rescue RestClient::ExceptionWithResponse => rcex
      exdict = JSON.parse(rcex.response)
      if exdict['code'] == 'Conflict'
        $log.fatal "Duplicate Error: document #{unique_document_identifier} already exists, data=>" + record.to_json
      else
        $log.fatal "RestClient Error: '#{rcex.response}', data=>" + record.to_json
      end
    rescue => ex
      $log.fatal "UnknownError: '#{ex}', uniqueid=>#{unique_doc_identifier}, data=>" + record.to_json
    end
  }
end