Module: Chewy::Type::Import::ClassMethods
- Defined in:
- lib/chewy/type/import.rb
Instance Method Summary collapse
-
#bulk(options = {}) ⇒ Object
Wraps elasticsearch-ruby client indices bulk method.
-
#import(*args) ⇒ Object
Perform import operation for specified documents.
-
#import!(*args) ⇒ Object
Perform import operation for specified documents.
- #journal? ⇒ Boolean
Instance Method Details
#bulk(options = {}) ⇒ Object
Wraps elasticsearch-ruby client indices bulk method. Adds ‘:suffix` option to bulk import to index with specified suffix.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/chewy/type/import.rb', line 69 def bulk( = {}) suffix = .delete(:suffix) bulk_size = .delete(:bulk_size) body = .delete(:body) journal = .delete(:journal) header = { index: index.build_index_name(suffix: suffix), type: type_name } bodies = if bulk_size bulk_size -= 1.kilobyte # 1 kilobyte for request header and newlines raise ArgumentError, 'Import `:bulk_size` can\'t be less than 1 kilobyte' if bulk_size <= 0 entries = body.each_with_object(['']) do |entry, result| operation, = entry.to_a.first data = .delete(:data) entry = [{ operation => }, data].compact.map(&:to_json).join("\n") raise ArgumentError, 'Import `:bulk_size` seems to be less than entry size' if entry.bytesize > bulk_size if result.last.bytesize + entry.bytesize > bulk_size result.push(entry) else result[-1] = [result[-1], entry].delete_if(&:blank?).join("\n") end end entries.map { |entry| entry + "\n" } else [body] end if journal.any_records? Chewy::Journal.create bodies += [journal.bulk_body] end items = bodies.map do |item_body| result = client.bulk .merge(header).merge(body: item_body) result.try(:[], 'items') || [] end.flatten Chewy.wait_for_status extract_errors items end |
#import(*args) ⇒ Object
Perform import operation for specified documents. Returns true or false depending on success.
UsersIndex::User.import # imports default data set
UsersIndex::User.import User.active # imports active users
UsersIndex::User.import [1, 2, 3] # imports users with specified ids
UsersIndex::User.import users # imports users collection
UsersIndex::User.import suffix: Time.now.to_i # imports data to index with specified suffix if such exists
UsersIndex::User.import refresh: false # to disable index refreshing after import
UsersIndex::User.import journal: true # import will record all the actions into special journal index
UsersIndex::User.import batch_size: 300 # import batch size
UsersIndex::User.import bulk_size: 10.megabytes # import ElasticSearch bulk size in bytes
UsersIndex::User.import consistency: :quorum # explicit write consistency setting for the operation (one, quorum, all)
UsersIndex::User.import replication: :async # explicitly set the replication type (sync, async)
See adapters documentation for more details.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/chewy/type/import.rb', line 26 def import(*args) = args. .reverse_merge! = .reject { |k, _| !BULK_OPTIONS.include?(k) }.reverse_merge!(refresh: true) index.create!(.slice(:suffix)) unless index.exists? ActiveSupport::Notifications.instrument 'import_objects.chewy', type: self do |payload| adapter.import(*args, ) do |action_objects| journal = Chewy::Journal.new(self) journal.add(action_objects) if .fetch(:journal) { journal? } indexed_objects = build_root.parent_id && fetch_indexed_objects(action_objects.values.flatten) body = bulk_body(action_objects, indexed_objects) errors = bulk(.merge(body: body, journal: journal)) if body.present? fill_payload_import payload, action_objects fill_payload_errors payload, errors if errors.present? !errors.present? end end end |
#import!(*args) ⇒ Object
Perform import operation for specified documents. Raises Chewy::ImportFailed exception in case of import errors. Options are completely the same as for ‘import` method See adapters documentation for more details.
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/chewy/type/import.rb', line 55 def import!(*args) errors = nil subscriber = ActiveSupport::Notifications.subscribe('import_objects.chewy') do |*notification_args| errors = notification_args.last[:errors] end import(*args) raise Chewy::ImportFailed.new(self, errors) if errors.present? true ensure ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber end |
#journal? ⇒ Boolean
112 113 114 |
# File 'lib/chewy/type/import.rb', line 112 def journal? .fetch(:journal) { Chewy.configuration[:journal] } end |