Class: Scalastic::Partition
- Inherits:
-
Object
- Object
- Scalastic::Partition
- Defined in:
- lib/scalastic/partition.rb
Defined Under Namespace
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#es_client ⇒ Object
readonly
Returns the value of attribute es_client.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Instance Method Summary collapse
- #bulk(args) ⇒ Object
- #create(args) ⇒ Object
- #delete(args = {}) ⇒ Object
- #delete_by_query(args) ⇒ Object
- #exists? ⇒ Boolean
- #extend_to(args) ⇒ Object
- #get(args) ⇒ Object
- #get_endpoints ⇒ Object
- #index(args) ⇒ Object
- #index_to(args) ⇒ Object
-
#initialize(es_client, config, id) ⇒ Partition
constructor
A new instance of Partition.
- #inspect ⇒ Object
- #mget(args) ⇒ Object
- #readonly? ⇒ Boolean
- #scroll(args) ⇒ Object
- #search(args = {}) ⇒ Object
Constructor Details
#initialize(es_client, config, id) ⇒ Partition
Returns a new instance of Partition.
14 15 16 17 18 19 20 21 22 |
# File 'lib/scalastic/partition.rb', line 14 def initialize(es_client, config, id) raise(ArgumentError, 'ES client is nil!') if es_client.nil? raise(ArgumentError, 'config is nil!') if config.nil? raise(ArgumentError, 'id is empty!') if id.nil? || id.to_s.empty? @es_client = es_client @config = config @id = id end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
11 12 13 |
# File 'lib/scalastic/partition.rb', line 11 def config @config end |
#es_client ⇒ Object (readonly)
Returns the value of attribute es_client.
10 11 12 |
# File 'lib/scalastic/partition.rb', line 10 def es_client @es_client end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
12 13 14 |
# File 'lib/scalastic/partition.rb', line 12 def id @id end |
Instance Method Details
#bulk(args) ⇒ Object
76 77 78 79 80 81 82 83 |
# File 'lib/scalastic/partition.rb', line 76 def bulk(args) body = args.clone[:body] || raise(ArgumentError, 'Missing required argument :body') new_ops = body.map{|entry| [operation_name(entry), entry]}.reduce([]){|acc, op| acc << [op.first, update_entry(acc, *op)]; acc} args[:body] = new_ops.map{|_op, entry| entry} es_client.bulk(args) end |
#create(args) ⇒ Object
58 59 60 61 62 63 |
# File 'lib/scalastic/partition.rb', line 58 def create(args) args[:body] ||= {} selector.apply_to(args[:body]) args = args.merge(index: config.index_endpoint(id)) es_client.create(args) end |
#delete(args = {}) ⇒ Object
65 66 67 68 |
# File 'lib/scalastic/partition.rb', line 65 def delete(args = {}) args = args.merge(index: config.search_endpoint(id)) es_client.delete(args) end |
#delete_by_query(args) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/scalastic/partition.rb', line 85 def delete_by_query(args) args = args.merge(index: config.search_endpoint(id), search_type: 'scan', scroll: '1m', size: 500, fields: []) results = es_client.search(args) loop do scroll_id = results['_scroll_id'] results = es_client.scroll(scroll_id: scroll_id, scroll: '1m') ops = results['hits']['hits'].map{|h| delete_op(h)} break if ops.empty? es_client.bulk(body: ops) end end |
#exists? ⇒ Boolean
70 71 72 73 74 |
# File 'lib/scalastic/partition.rb', line 70 def exists? names = [config.search_endpoint(id), config.index_endpoint(id)] all_aliases = es_client.indices.get_aliases name: names.join(',') all_aliases.any?{|_index, data| data['aliases'].any?} end |
#extend_to(args) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/scalastic/partition.rb', line 24 def extend_to(args) index = args[:index] raise(ArgumentError, 'Missing required argument :index') if index.nil? || index.to_s.empty? index_alias = config.index_endpoint(id) indices = es_client.indices.get_aliases(name: index_alias).select{|i, d| d['aliases'].any?}.keys actions = indices.map{|i| {remove: {index: i, alias: index_alias}}} actions << {add: EsActionsGenerator.new_index_alias(config, args.merge(id: id))} actions << {add: EsActionsGenerator.new_search_alias(config, args.merge(id: id))} es_client.indices.update_aliases(body: {actions: actions}) end |
#get(args) ⇒ Object
41 42 43 44 |
# File 'lib/scalastic/partition.rb', line 41 def get(args) args = args.merge(index: config.search_endpoint(id)) es_client.get(args) end |
#get_endpoints ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/scalastic/partition.rb', line 101 def get_endpoints sa = config.search_endpoint(id) ia = config.index_endpoint(id) aliases = es_client.indices.get_aliases name: [sa, ia].join(',') sas = aliases.map{|i, d| [i, d['aliases'][sa]]}.reject{|_i, sa| sa.nil?} ias = aliases.map{|i, d| [i, d['aliases'][ia]]}.reject{|_i, ia| ia.nil?} Endpoints.new( ias.map{|i, ia| Endpoint.new(i, ia['index_routing']).freeze}.first, sas.map{|i, sa| Endpoint.new(i, sa['search_routing']).freeze}.freeze ).freeze end |
#index(args) ⇒ Object
51 52 53 54 55 56 |
# File 'lib/scalastic/partition.rb', line 51 def index(args) args[:body] ||= {} selector.apply_to(args[:body]) args = args.merge(index: config.index_endpoint(id)) es_client.index(args) end |
#index_to(args) ⇒ Object
113 114 115 116 117 118 119 120 121 |
# File 'lib/scalastic/partition.rb', line 113 def index_to(args) ie = config.index_endpoint(id) eps = get_endpoints actions = [] actions << {remove: {index: eps.index.index, alias: ie}} if eps.index actions << {add: EsActionsGenerator.new_index_alias(config, args.merge(id: id))} unless args.nil? #TODO: log a warning if there're no updates es_client.indices.update_aliases(body: {actions: actions}) if actions.any? end |
#inspect ⇒ Object
97 98 99 |
# File 'lib/scalastic/partition.rb', line 97 def inspect "ES partition #{id}" end |
#mget(args) ⇒ Object
46 47 48 49 |
# File 'lib/scalastic/partition.rb', line 46 def mget(args) args = args.merge(index: config.search_endpoint(id)) es_client.mget(args) end |
#readonly? ⇒ Boolean
123 124 125 |
# File 'lib/scalastic/partition.rb', line 123 def readonly? get_endpoints.index.nil? end |
#scroll(args) ⇒ Object
127 128 129 130 |
# File 'lib/scalastic/partition.rb', line 127 def scroll(args) args = args.merge(index: config.search_endpoint(id)) Scroller.new(es_client, args) end |
#search(args = {}) ⇒ Object
36 37 38 39 |
# File 'lib/scalastic/partition.rb', line 36 def search(args = {}) args = args.merge(index: config.search_endpoint(id)) es_client.search(args) end |