Class: Scalastic::Partition

Inherits:
Object
  • Object
show all
Defined in:
lib/scalastic/partition.rb

Defined Under Namespace

Classes: Endpoint, Endpoints

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(es_client, config, id) ⇒ Partition

Returns a new instance of Partition.

Raises:

  • (ArgumentError)


14
15
16
17
18
19
20
21
22
# File 'lib/scalastic/partition.rb', line 14

def initialize(es_client, config, id)
  raise(ArgumentError, 'ES client is nil!') if es_client.nil?
  raise(ArgumentError, 'config is nil!') if config.nil?
  raise(ArgumentError, 'id is empty!') if id.nil? || id.to_s.empty?

  @es_client = es_client
  @config = config
  @id = id
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



11
12
13
# File 'lib/scalastic/partition.rb', line 11

def config
  @config
end

#es_clientObject (readonly)

Returns the value of attribute es_client.



10
11
12
# File 'lib/scalastic/partition.rb', line 10

def es_client
  @es_client
end

#idObject (readonly)

Returns the value of attribute id.



12
13
14
# File 'lib/scalastic/partition.rb', line 12

def id
  @id
end

Instance Method Details

#bulk(args) ⇒ Object



76
77
78
79
80
81
82
83
# File 'lib/scalastic/partition.rb', line 76

def bulk(args)
  body = args.clone[:body] || raise(ArgumentError, 'Missing required argument :body')

  new_ops = body.map{|entry| [operation_name(entry), entry]}.reduce([]){|acc, op| acc << [op.first, update_entry(acc, *op)]; acc}
  args[:body] = new_ops.map{|_op, entry| entry}

  es_client.bulk(args)
end

#create(args) ⇒ Object



58
59
60
61
62
63
# File 'lib/scalastic/partition.rb', line 58

def create(args)
  args[:body] ||= {}
  selector.apply_to(args[:body])
  args = args.merge(index: config.index_endpoint(id))
  es_client.create(args)
end

#delete(args = {}) ⇒ Object



65
66
67
68
# File 'lib/scalastic/partition.rb', line 65

def delete(args = {})
  args = args.merge(index: config.search_endpoint(id))
  es_client.delete(args)
end

#delete_by_query(args) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/scalastic/partition.rb', line 85

def delete_by_query(args)
  args = args.merge(index: config.search_endpoint(id), search_type: 'scan', scroll: '1m', size: 500, fields: [])
  results = es_client.search(args)
  loop do
    scroll_id = results['_scroll_id']
    results = es_client.scroll(scroll_id: scroll_id, scroll: '1m')
    ops = results['hits']['hits'].map{|h| delete_op(h)}
    break if ops.empty?
    es_client.bulk(body: ops)
  end
end

#exists?Boolean

Returns:

  • (Boolean)


70
71
72
73
74
# File 'lib/scalastic/partition.rb', line 70

def exists?
  names = [config.search_endpoint(id), config.index_endpoint(id)]
  all_aliases = es_client.indices.get_aliases name: names.join(',')
  all_aliases.any?{|_index, data| data['aliases'].any?}
end

#extend_to(args) ⇒ Object

Raises:

  • (ArgumentError)


24
25
26
27
28
29
30
31
32
33
34
# File 'lib/scalastic/partition.rb', line 24

def extend_to(args)
  index = args[:index]
  raise(ArgumentError, 'Missing required argument :index') if index.nil? || index.to_s.empty?

  index_alias = config.index_endpoint(id)
  indices = es_client.indices.get_aliases(name: index_alias).select{|i, d| d['aliases'].any?}.keys
  actions = indices.map{|i| {remove: {index: i, alias: index_alias}}}
  actions << {add: EsActionsGenerator.new_index_alias(config, args.merge(id: id))}
  actions << {add: EsActionsGenerator.new_search_alias(config, args.merge(id: id))}
  es_client.indices.update_aliases(body: {actions: actions})
end

#get(args) ⇒ Object



41
42
43
44
# File 'lib/scalastic/partition.rb', line 41

def get(args)
  args = args.merge(index: config.search_endpoint(id))
  es_client.get(args)
end

#get_endpointsObject



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/scalastic/partition.rb', line 101

def get_endpoints
  sa = config.search_endpoint(id)
  ia = config.index_endpoint(id)
  aliases = es_client.indices.get_aliases name: [sa, ia].join(',')
  sas = aliases.map{|i, d| [i, d['aliases'][sa]]}.reject{|_i, sa| sa.nil?}
  ias = aliases.map{|i, d| [i, d['aliases'][ia]]}.reject{|_i, ia| ia.nil?}
  Endpoints.new(
    ias.map{|i, ia| Endpoint.new(i, ia['index_routing']).freeze}.first,
    sas.map{|i, sa| Endpoint.new(i, sa['search_routing']).freeze}.freeze
  ).freeze
end

#index(args) ⇒ Object



51
52
53
54
55
56
# File 'lib/scalastic/partition.rb', line 51

def index(args)
  args[:body] ||= {}
  selector.apply_to(args[:body])
  args = args.merge(index: config.index_endpoint(id))
  es_client.index(args)
end

#index_to(args) ⇒ Object



113
114
115
116
117
118
119
120
121
# File 'lib/scalastic/partition.rb', line 113

def index_to(args)
  ie = config.index_endpoint(id)
  eps = get_endpoints
  actions = []
  actions << {remove: {index: eps.index.index, alias: ie}} if eps.index
  actions << {add: EsActionsGenerator.new_index_alias(config, args.merge(id: id))} unless args.nil?
  #TODO: log a warning if there're no updates
  es_client.indices.update_aliases(body: {actions: actions}) if actions.any?
end

#inspectObject



97
98
99
# File 'lib/scalastic/partition.rb', line 97

def inspect
  "ES partition #{id}"
end

#mget(args) ⇒ Object



46
47
48
49
# File 'lib/scalastic/partition.rb', line 46

def mget(args)
  args = args.merge(index: config.search_endpoint(id))
  es_client.mget(args)
end

#readonly?Boolean

Returns:

  • (Boolean)


123
124
125
# File 'lib/scalastic/partition.rb', line 123

def readonly?
  get_endpoints.index.nil?
end

#scroll(args) ⇒ Object



127
128
129
130
# File 'lib/scalastic/partition.rb', line 127

def scroll(args)
  args = args.merge(index: config.search_endpoint(id))
  Scroller.new(es_client, args)
end

#search(args = {}) ⇒ Object



36
37
38
39
# File 'lib/scalastic/partition.rb', line 36

def search(args = {})
  args = args.merge(index: config.search_endpoint(id))
  es_client.search(args)
end