Class: Fluent::Plugin::DocumentdbOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::DocumentdbOutput
- Defined in:
- lib/fluent/plugin/out_documentdb.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
- #multi_workers_ready? ⇒ Boolean
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/out_documentdb.rb', line 42 def configure(conf) compat_parameters_convert(conf, :buffer) super raise Fluent::ConfigError, 'no docdb_endpoint' if @docdb_endpoint.empty? raise Fluent::ConfigError, 'no docdb_account_key' if @docdb_account_key.empty? raise Fluent::ConfigError, 'no docdb_database' if @docdb_database.empty? raise Fluent::ConfigError, 'no docdb_collection' if @docdb_collection.empty? if @add_time_field and @time_field_name.empty? raise Fluent::ConfigError, 'time_field_name must be set if add_time_field is true' end if @add_tag_field and @tag_field_name.empty? raise Fluent::ConfigError, 'tag_field_name must be set if add_tag_field is true' end if @partitioned_collection raise Fluent::ConfigError, 'partition_key must be set in partitioned collection mode' if @partition_key.empty? if (@auto_create_collection && @offer_throughput < AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT) raise Fluent::ConfigError, sprintf("offer_throughput must be more than and equals to %s", AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT) end end raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag @timef = Fluent::TimeFormatter.new(@time_format, @localtime) end |
#format(tag, time, record) ⇒ Object
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/fluent/plugin/out_documentdb.rb', line 114 def format(tag, time, record) record['id'] = SecureRandom.uuid if @add_time_field record[@time_field_name] = @timef.format(time) end if @add_tag_field record[@tag_field_name] = tag end record.to_msgpack end |
#formatted_to_msgpack_binary? ⇒ Boolean
125 126 127 |
# File 'lib/fluent/plugin/out_documentdb.rb', line 125 def formatted_to_msgpack_binary? true end |
#multi_workers_ready? ⇒ Boolean
129 130 131 |
# File 'lib/fluent/plugin/out_documentdb.rb', line 129 def multi_workers_ready? true end |
#shutdown ⇒ Object
109 110 111 112 |
# File 'lib/fluent/plugin/out_documentdb.rb', line 109 def shutdown super # destroy end |
#start ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/fluent/plugin/out_documentdb.rb', line 67 def start super begin @client = nil if @partitioned_collection @client = AzureDocumentDB::PartitionedCollectionClient.new(@docdb_account_key,@docdb_endpoint) else @client = AzureDocumentDB::Client.new(@docdb_account_key,@docdb_endpoint) end ## initial operations for database res = @client.find_databases_by_name(@docdb_database) if( res[:body]["_count"].to_i == 0 ) raise "No database (#{docdb_database}) exists! Enable auto_create_database or create it by useself" if !@auto_create_database # create new database as it doesn't exists @client.create_database(@docdb_database) end ## initial operations for collection database_resource = @client.get_database_resource(@docdb_database) res = @client.find_collections_by_name(database_resource, @docdb_collection) if( res[:body]["_count"].to_i == 0 ) raise "No collection (#{docdb_collection}) exists! Enable auto_create_collection or create it by useself" if !@auto_create_collection # create new collection as it doesn't exists if @partitioned_collection partition_key_paths = ["/#{@partition_key}"] @client.create_collection(database_resource, @docdb_collection, partition_key_paths, @offer_throughput) else @client.create_collection(database_resource, @docdb_collection) end end @coll_resource = @client.get_collection_resource(database_resource, @docdb_collection) rescue Exception =>ex log.fatal "Error: '#{ex}'" exit! end end |
#write(chunk) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/fluent/plugin/out_documentdb.rb', line 133 def write(chunk) chunk.msgpack_each { |record| unique_doc_identifier = record["id"] begin if @partitioned_collection @client.create_document(@coll_resource, unique_doc_identifier, record, @partition_key) else @client.create_document(@coll_resource, unique_doc_identifier, record) end rescue RestClient::ExceptionWithResponse => rcex exdict = JSON.parse(rcex.response) if exdict['code'] == 'Conflict' log.fatal "Duplicate Error: document #{unique_doc_identifier} already exists, data=>" + record.to_json else log.fatal "RestClient Error: '#{rcex.response}', data=>" + record.to_json end rescue => ex log.fatal "UnknownError: '#{ex}', uniqueid=>#{unique_doc_identifier}, data=>" + record.to_json end } end |