Class: Embulk::Output::Documentdb

Inherits:
OutputPlugin
  • Object
show all
Defined in:
lib/embulk/output/documentdb.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.transaction(config, schema, count, &control) ⇒ Object

Raises:

  • (ConfigError)


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/embulk/output/documentdb.rb', line 14

def self.transaction(config, schema, count, &control)
  # configuration code:
  task = {
    'docdb_endpoint'         => config.param('docdb_endpoint',        :string),
    'docdb_account_key'      => config.param('docdb_account_key',     :string),
    'docdb_database'         => config.param('docdb_database',        :string),
    'docdb_collection'       => config.param('docdb_collection',      :string),
    'auto_create_database'   => config.param('auto_create_database',  :bool,  :default => true),
    'auto_create_collection' => config.param('auto_create_collection',:bool,  :default => true),
    'partitioned_collection' => config.param('partitioned_collection',:bool,  :default => false),
    'partition_key'          => config.param('partition_key',         :string, :default => nil),
    'offer_throughput'       => config.param('offer_throughput',      :integer, :default => AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT),
    'key_column'             => config.param('key_column',            :string),
  }
  Embulk.logger.info "transaction start"
  # param validation
  raise ConfigError, 'no docdb_endpoint' if task['docdb_endpoint'].empty?
  raise ConfigError, 'no docdb_account_key' if task['docdb_account_key'].empty?
  raise ConfigError, 'no docdb_database' if task['docdb_database'].empty?
  raise ConfigError, 'no docdb_collection' if task['docdb_collection'].empty?
  raise ConfigError, 'no key_column' if task['key_column'].empty?

  if task['partitioned_collection']
    raise ConfigError, 'partition_key must be set in partitioned collection mode' if @partition_key.empty?
    if (task['auto_create_collection'] && task['offer_throughput'] < AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT)
      raise ConfigError, sprintf("offer_throughput must be more than and equals to %s",
                          AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT)
    end
  end

  # resumable output:
  # resume(task, schema, count, &control)

  # non-resumable output:
  Embulk.logger.info "Documentdb output start"
  task_reports = yield(task)
  Embulk.logger.info "Documentdb output finished. Task reports = #{task_reports.to_json}"

  next_config_diff = {}
  return next_config_diff
end

Instance Method Details

#abortObject



152
153
# File 'lib/embulk/output/documentdb.rb', line 152

def abort
end

#add(page) ⇒ Object

called for each page in each task



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/embulk/output/documentdb.rb', line 113

def add(page)
  # output code:
  page.each do |record|
    hash = Hash[schema.names.zip(record)]
    @recordnum += 1
    if !hash.key?(@task['key_column']) 
      Embulk.logger.warn { "Skip Invalid Record: no key_column, data=>" + hash.to_json }
      next 
    end
    unique_doc_id = "#{hash[@task['key_column']]}"
    if @task['key_column'] != 'id'
      hash.delete(@task['key_column'])
    end
    # force primary key to be both named "id" and "string" type
    hash['id'] = unique_doc_id 

    begin 
      if @task['partitioned_collection']
        @client.create_document(@coll_resource, unique_doc_id, hash, @task['partition_key']) 
      else
        @client.create_document(@coll_resource, unique_doc_id, hash) 
      end
      @successnum += 1
    rescue RestClient::ExceptionWithResponse => rcex
      exdict = JSON.parse(rcex.response)
      if exdict['code'] == 'Conflict'
        Embulk.logger.error { "Duplicate Error: doc id (#{unique_doc_id}) already exists, data=>" + hash.to_json }
      else
        Embulk.logger.error { "RestClient Error: '#{rcex.response}', data=>" + hash.to_json }
      end
    rescue => ex
      Embulk.logger.error { "UnknownError: '#{ex}', doc id=>#{unique_doc_id}, data=>" + hash.to_json }
    end
  end
end

#closeObject



109
110
# File 'lib/embulk/output/documentdb.rb', line 109

def close
end

#commitObject



155
156
157
158
159
160
161
162
# File 'lib/embulk/output/documentdb.rb', line 155

def commit
  task_report = {
    "total_records" => @recordnum,
    "success" => @successnum,
    "skip_or_error" => (@recordnum - @successnum),
  }
  return task_report
end

#finishObject



149
150
# File 'lib/embulk/output/documentdb.rb', line 149

def finish
end

#initObject

init is called in initialize(task, schema, index)



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/embulk/output/documentdb.rb', line 65

def init
  # initialization code:
  @recordnum = 0
  @successnum = 0

  begin
    @client = nil
    if task['partitioned_collection']
      @client = AzureDocumentDB::PartitionedCollectionClient.new(task['docdb_account_key'],task['docdb_endpoint'])
    else
      @client = AzureDocumentDB::Client.new(task['docdb_account_key'],task['docdb_endpoint'])
    end

    # initial operations for database
    res = @client.find_databases_by_name(task['docdb_database'])
    if( res[:body]["_count"].to_i == 0 )
      raise "No database (#{docdb_database})! Enable auto_create_database or create it by yourself" if !task['auto_create_database']
      # create new database as it doesn't exists
      @client.create_database(task['docdb_database'])
    end
    
    # initial operations for collection
    database_resource = @client.get_database_resource(task['docdb_database'])
    res = @client.find_collections_by_name(database_resource, task['docdb_collection'])
    if( res[:body]["_count"].to_i == 0 )
      raise "No collection (#{docdb_collection})! Enable auto_create_collection or create it by yourself" if !task['auto_create_collection']
      # create new collection as it doesn't exists
      if task['partitioned_collection']
        partition_key_paths = ["/#{task['partition_key']}"] 
        @client.create_collection(database_resource,
                         task['docdb_collection'], partition_key_paths, task['offer_throughput'])
      else
        @client.create_collection(database_resource, task['docdb_collection'])               
      end
    end
    @coll_resource = @client.get_collection_resource(database_resource, task['docdb_collection'])

  rescue Exception =>ex
    Embulk.logger.error { "Error: init: '#{ex}'" }
    exit!
  end
end