Class: Fluent::Plugin::DocumentdbOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_documentdb.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_documentdb.rb', line 42

def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  raise Fluent::ConfigError, 'no docdb_endpoint' if @docdb_endpoint.empty?
  raise Fluent::ConfigError, 'no docdb_account_key' if @docdb_account_key.empty?
  raise Fluent::ConfigError, 'no docdb_database' if @docdb_database.empty?
  raise Fluent::ConfigError, 'no docdb_collection' if @docdb_collection.empty?
  if @add_time_field and @time_field_name.empty?
    raise Fluent::ConfigError, 'time_field_name must be set if add_time_field is true'
  end
  if @add_tag_field and @tag_field_name.empty?
    raise Fluent::ConfigError, 'tag_field_name must be set if add_tag_field is true'
  end
  if @partitioned_collection
    raise Fluent::ConfigError, 'partition_key must be set in partitioned collection mode' if @partition_key.empty?
    if (@auto_create_collection &&
          @offer_throughput < AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT)
      raise Fluent::ConfigError, sprintf("offer_throughput must be more than and equals to %s",
                             AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT)
    end
  end
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
  @timef = Fluent::TimeFormatter.new(@time_format, @localtime)
end

#format(tag, time, record) ⇒ Object



114
115
116
117
118
119
120
121
122
123
# File 'lib/fluent/plugin/out_documentdb.rb', line 114

def format(tag, time, record)
  record['id'] =  SecureRandom.uuid
  if @add_time_field
    record[@time_field_name] = @timef.format(time)
  end
  if @add_tag_field
    record[@tag_field_name] = tag
  end
  record.to_msgpack
end

#formatted_to_msgpack_binary?Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/fluent/plugin/out_documentdb.rb', line 125

def formatted_to_msgpack_binary?
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/fluent/plugin/out_documentdb.rb', line 129

def multi_workers_ready?
  true
end

#shutdownObject



109
110
111
112
# File 'lib/fluent/plugin/out_documentdb.rb', line 109

def shutdown
  super
  # destroy
end

#startObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/out_documentdb.rb', line 67

def start
  super

  begin

    @client = nil
    if @partitioned_collection
      @client = AzureDocumentDB::PartitionedCollectionClient.new(@docdb_account_key,@docdb_endpoint)
    else
      @client = AzureDocumentDB::Client.new(@docdb_account_key,@docdb_endpoint)
    end

    ## initial operations for database
    res = @client.find_databases_by_name(@docdb_database)
    if( res[:body]["_count"].to_i == 0 )
      raise "No database (#{docdb_database}) exists! Enable auto_create_database or create it by useself" if !@auto_create_database
      # create new database as it doesn't exists
      @client.create_database(@docdb_database)
    end

    ## initial operations for collection
    database_resource = @client.get_database_resource(@docdb_database)
    res = @client.find_collections_by_name(database_resource, @docdb_collection)
    if( res[:body]["_count"].to_i == 0 )
      raise "No collection (#{docdb_collection}) exists! Enable auto_create_collection or create it by useself" if !@auto_create_collection
      # create new collection as it doesn't exists
      if @partitioned_collection
        partition_key_paths = ["/#{@partition_key}"]
        @client.create_collection(database_resource,
                    @docdb_collection, partition_key_paths, @offer_throughput)
      else
        @client.create_collection(database_resource, @docdb_collection)
      end
    end
    @coll_resource = @client.get_collection_resource(database_resource, @docdb_collection)

  rescue Exception =>ex
    log.fatal "Error: '#{ex}'"
    exit!
  end
end

#write(chunk) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/fluent/plugin/out_documentdb.rb', line 133

def write(chunk)
  chunk.msgpack_each { |record|
    unique_doc_identifier = record["id"]
    begin
      if @partitioned_collection
        @client.create_document(@coll_resource, unique_doc_identifier, record, @partition_key)
      else
        @client.create_document(@coll_resource, unique_doc_identifier, record)
      end
    rescue RestClient::ExceptionWithResponse => rcex
      exdict = JSON.parse(rcex.response)
      if exdict['code'] == 'Conflict'
        log.fatal "Duplicate Error: document #{unique_doc_identifier} already exists, data=>" + record.to_json
      else
        log.fatal "RestClient Error: '#{rcex.response}', data=>" + record.to_json
      end
    rescue => ex
      log.fatal "UnknownError: '#{ex}', uniqueid=>#{unique_doc_identifier}, data=>" + record.to_json
    end
  }
end