Class: Embulk::Output::Documentdb

Inherits:
OutputPlugin
  • Object
show all
Defined in:
lib/embulk/output/documentdb.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.transaction(config, schema, count, &control) ⇒ Object

Raises:

  • (ConfigError)


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/embulk/output/documentdb.rb', line 14

def self.transaction(config, schema, count, &control)
  # configuration code:
  task = {
    'docdb_endpoint'         => config.param('docdb_endpoint',        :string),
    'docdb_account_key'      => config.param('docdb_account_key',     :string),
    'docdb_database'         => config.param('docdb_database',        :string),
    'docdb_collection'       => config.param('docdb_collection',      :string),
    'auto_create_database'   => config.param('auto_create_database',  :bool,  :default => true),
    'auto_create_collection' => config.param('auto_create_collection',:bool,  :default => true),
    'partitioned_collection' => config.param('partitioned_collection',:bool,  :default => false),
    'partition_key'          => config.param('partition_key',         :string, :default => nil),
    'offer_throughput'       => config.param('offer_throughput',      :integer, :default => AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT),
    'key_column'             => config.param('key_column',            :string),
  }
  Embulk.logger.info "transaction start"
  # param validation
  raise ConfigError, 'no docdb_endpoint' if task['docdb_endpoint'].empty?
  raise ConfigError, 'no docdb_account_key' if task['docdb_account_key'].empty?
  raise ConfigError, 'no docdb_database' if task['docdb_database'].empty?
  raise ConfigError, 'no docdb_collection' if task['docdb_collection'].empty?
  raise ConfigError, 'no key_column' if task['key_column'].empty?

  if task['partitioned_collection']
    raise ConfigError, 'partition_key must be set in partitioned collection mode' if @partition_key.empty?
    if (task['auto_create_collection'] && task['offer_throughput'] < AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT)
      raise ConfigError, sprintf("offer_throughput must be more than and equals to %s",
                          AzureDocumentDB::PARTITIONED_COLL_MIN_THROUGHPUT)
    end
  end

  # resumable output:
  # resume(task, schema, count, &control)

  # non-resumable output:
  task_reports = yield(task)
  Embulk.logger.info "Documentdb output finished. Task reports = #{task_reports.to_json}"

  next_config_diff = {}
  return next_config_diff
end

Instance Method Details

#abortObject



155
156
# File 'lib/embulk/output/documentdb.rb', line 155

def abort
end

#add(page) ⇒ Object

called for each page in each task



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/embulk/output/documentdb.rb', line 114

def add(page)
  # output code:
  page.each do |record|
    hash = Hash[schema.names.zip(record)]
    @recordnum += 1
    if !hash.key?(@task['key_column']) 
      Embulk.logger.warn { "Skip Invalid Record: no key_column, data=>" + hash.to_json }
      next 
    end
    unique_doc_id = "#{hash[@task['key_column']]}"
    if @task['key_column'] != 'id'
      hash.delete(@task['key_column'])
    end
    # force primary key to be both named "id" and "string" type
    hash['id'] = unique_doc_id 

    begin 
      if @task['partitioned_collection']
        @client.create_document(@coll_resource, unique_doc_id, hash, @task['partition_key']) 
      else
        @client.create_document(@coll_resource, unique_doc_id, hash) 
      end
      @successnum += 1
    rescue RestClient::ExceptionWithResponse => rcex
      exdict = JSON.parse(rcex.response)
      if exdict['code'] == 'Conflict'
        Embulk.logger.error { "Duplicate Error: doc id (#{unique_doc_id}) already exists, data=>" + hash.to_json }
      else
        Embulk.logger.error { "RestClient Error: '#{rcex.response}', data=>" + hash.to_json }
      end
    rescue => ex
      Embulk.logger.error { "UnknownError: '#{ex}', doc id=>#{unique_doc_id}, data=>" + hash.to_json }
    end
  end
end

#closeObject



110
111
# File 'lib/embulk/output/documentdb.rb', line 110

def close
end

#commitObject



158
159
160
161
162
163
164
165
166
167
168
# File 'lib/embulk/output/documentdb.rb', line 158

def commit
  Embulk.logger.info "Documentdb output commit"
  elapsed_time = @finish_time - @start_time
  task_report = {
    "total_records" => @recordnum,
    "success" => @successnum,
    "skip_or_error" => (@recordnum - @successnum),
    "elapsed_time" => elapsed_time,
  }
  return task_report
end

#finishObject



150
151
152
153
# File 'lib/embulk/output/documentdb.rb', line 150

def finish
  Embulk.logger.info "Documentdb output finish"
  @finish_time = Time.now
end

#initObject

init is called in initialize(task, schema, index)



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/embulk/output/documentdb.rb', line 64

def init
  Embulk.logger.info "Documentdb output init"
  @start_time = Time.now
  # initialization code:
  @recordnum = 0
  @successnum = 0

  begin
    @client = nil
    if task['partitioned_collection']
      @client = AzureDocumentDB::PartitionedCollectionClient.new(task['docdb_account_key'],task['docdb_endpoint'])
    else
      @client = AzureDocumentDB::Client.new(task['docdb_account_key'],task['docdb_endpoint'])
    end

    # initial operations for database
    res = @client.find_databases_by_name(task['docdb_database'])
    if( res[:body]["_count"].to_i == 0 )
      raise "No database (#{docdb_database})! Enable auto_create_database or create it by yourself" if !task['auto_create_database']
      # create new database as it doesn't exists
      @client.create_database(task['docdb_database'])
    end
    
    # initial operations for collection
    database_resource = @client.get_database_resource(task['docdb_database'])
    res = @client.find_collections_by_name(database_resource, task['docdb_collection'])
    if( res[:body]["_count"].to_i == 0 )
      raise "No collection (#{docdb_collection})! Enable auto_create_collection or create it by yourself" if !task['auto_create_collection']
      # create new collection as it doesn't exists
      if task['partitioned_collection']
        partition_key_paths = ["/#{task['partition_key']}"] 
        @client.create_collection(database_resource,
                         task['docdb_collection'], partition_key_paths, task['offer_throughput'])
      else
        @client.create_collection(database_resource, task['docdb_collection'])               
      end
    end
    @coll_resource = @client.get_collection_resource(database_resource, task['docdb_collection'])

  rescue Exception =>ex
    Embulk.logger.error { "Error: init: '#{ex}'" }
    exit!
  end
end