Class: Embulk::Output::Azuresearch
- Inherits:
-
OutputPlugin
- Object
- OutputPlugin
- Embulk::Output::Azuresearch
- Defined in:
- lib/embulk/output/azuresearch.rb
Class Method Summary collapse
Instance Method Summary collapse
- #abort ⇒ Object
-
#add(page) ⇒ Object
called for each page in each task.
- #add_documents_to_azuresearch(documents) ⇒ Object
- #close ⇒ Object
- #commit ⇒ Object
- #finish ⇒ Object
-
#init ⇒ Object
init is called in initialize(task, schema, index).
Class Method Details
.transaction(config, schema, count, &control) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/embulk/output/azuresearch.rb', line 11 def self.transaction(config, schema, count, &control) Embulk.logger.info "Azuresearch output transaction start" # configuration code: task = { 'endpoint' => config.param('endpoint', :string), 'api_key' => config.param('api_key', :string), 'search_index' => config.param('search_index', :string), 'column_names' => config.param('column_names', :string), 'key_names' => config.param('key_names', :string, :default => nil), } # param validation raise ConfigError, 'no endpoint' if task['endpoint'].empty? raise ConfigError, 'no api_key' if task['api_key'].empty? raise ConfigError, 'no search_index' if task['search_index'].empty? raise ConfigError, 'no column_names' if task['column_names'].empty? # resumable output: # resume(task, schema, count, &control) # non-resumable output: task_reports = yield(task) Embulk.logger.info "Azuresearch output finished. Task reports = #{task_reports.to_json}" next_config_diff = {} return next_config_diff end |
Instance Method Details
#abort ⇒ Object
90 91 |
# File 'lib/embulk/output/azuresearch.rb', line 90 def abort end |
#add(page) ⇒ Object
called for each page in each task
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/embulk/output/azuresearch.rb', line 63 def add(page) # output code: documents = [] page.each do |record| hash = Hash[schema.names.zip(record)] document = {} @key_names.each_with_index do |key,i| document[@column_names[i]] = hash[key] end documents.push(document) @recordnum += 1 if documents.length >= AzureSearch::MAX_DOCS_PER_INDEX_UPLOAD add_documents_to_azuresearch(documents) documents = [] end end if documents.length > 0 add_documents_to_azuresearch(documents) end end |
#add_documents_to_azuresearch(documents) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/embulk/output/azuresearch.rb', line 105 def add_documents_to_azuresearch(documents) begin res = @client.add_documents(@search_index, documents) if res.code == 200 # all docs are successfully inserted/updated @successnum += documents.length return end # parse response msg to figure out which docs is wrong only in case response code != 200 resdict = JSON.parse(res) if (!resdict.key?('value') ) Embulk.logger.error { "Unknown Reponse format, documents=>" + documents.to_json } return end resdict['value'].each do |docstatus| if !docstatus['status'] Embulk.logger.error { "Add document failure, dockey: #{docstatus['key']}, code: #{docstatus['statusCode']}, errmsg: #{docstatus['errorMessage']}" } else @successnum += 1 end end rescue Exception => ex Embulk.logger.error { "UnknownError: '#{ex}', documents=>" + documents.to_json } end end |
#close ⇒ Object
59 60 |
# File 'lib/embulk/output/azuresearch.rb', line 59 def close end |
#commit ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/embulk/output/azuresearch.rb', line 93 def commit Embulk.logger.info "AzureSearch output commit" elapsed_time = @finish_time - @start_time task_report = { "total_records" => @recordnum, "success" => @successnum, "skip_or_error" => (@recordnum - @successnum), "elapsed_time" => elapsed_time, } return task_report end |
#finish ⇒ Object
85 86 87 88 |
# File 'lib/embulk/output/azuresearch.rb', line 85 def finish Embulk.logger.info "AzureSearch output finish" @finish_time = Time.now end |
#init ⇒ Object
init is called in initialize(task, schema, index)
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/embulk/output/azuresearch.rb', line 44 def init # initialization code: Embulk.logger.info "Azuresearch output init" @start_time = Time.now @recordnum = 0 @successnum = 0 @search_index = task['search_index'] @column_names = task['column_names'].split(',') @key_names = task['key_names'].nil? ? @column_names : task['key_names'].split(',') raise ConfigError, 'NOT match keys number: column_names and key_names' if @key_names.length != @column_names.length @client=AzureSearch::Client::new( task['endpoint'], task['api_key'] ) end |