Class: ESReindex

Inherits:
Object
  • Object
show all
Defined in:
lib/es-reindex.rb,
lib/es-reindex/version.rb,
lib/es-reindex/args_parser.rb

Defined Under Namespace

Classes: ArgsParser

Constant Summary collapse

DEFAULT_URL =
'http://10.203.175.32:9200'
VERSION =
'0.1.0'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(src, dst, options = {}) ⇒ ESReindex

Returns a new instance of ESReindex.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/es-reindex.rb', line 10

def initialize(src, dst, options = {})
  @src     = src || ''
  @dst     = dst || ''
  @options = {
    remove: false, # remove the index in the new location first
    update: false, # update existing documents (default: only create non-existing)
    frame:  1000   # specify frame size to be obtained with one fetch during scrolling
  }.merge! options

  @done = 0
end

Instance Attribute Details

#doneObject

Returns the value of attribute done.



8
9
10
# File 'lib/es-reindex.rb', line 8

def done
  @done
end

#dstObject

Returns the value of attribute dst.



8
9
10
# File 'lib/es-reindex.rb', line 8

def dst
  @dst
end

#optionsObject

Returns the value of attribute options.



8
9
10
# File 'lib/es-reindex.rb', line 8

def options
  @options
end

#srcObject

Returns the value of attribute src.



8
9
10
# File 'lib/es-reindex.rb', line 8

def src
  @src
end

#start_timeObject

Returns the value of attribute start_time.



8
9
10
# File 'lib/es-reindex.rb', line 8

def start_time
  @start_time
end

Instance Method Details

#go!Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/es-reindex.rb', line 22

def go!
  MultiJson.load_options = {mode: :compat}
  MultiJson.dump_options = {mode: :compat}

  surl, durl, sidx, didx = '', '', '', ''
  [[src, surl, sidx], [dst, durl, didx]].each do |param, url, idx|
    if param =~ %r{^(.*)/(.*?)$}
      url.replace $1
      idx.replace $2
    else
      url.replace DEFAULT_URL
      idx.replace param
    end
  end

  printf "Copying '%s/%s' to '%s/%s'%s\n  Confirm or hit Ctrl-c to abort...\n",
    surl, sidx, durl, didx,
    remove? ?
      ' with rewriting destination mapping!' :
      update? ? ' with updating existing documents!' : '.'

  $stdin.readline

  # remove old index in case of remove=true
  retried_request(:delete, "#{durl}/#{didx}") if remove? && retried_request(:get, "#{durl}/#{didx}/_status")

  # (re)create destination index
  unless retried_request :get, "#{durl}/#{didx}/_status"
    # obtain the original index settings first
    unless settings = retried_request(:get, "#{surl}/#{sidx}/_settings")
      warn "Failed to obtain original index '#{surl}/#{sidx}' settings!"
      exit 1
    end
    settings = MultiJson.load settings
    sidx = settings.keys[0]
    settings[sidx].delete 'index.version.created'
    printf 'Creating \'%s/%s\' index with settings from \'%s/%s\'... ', durl, didx, surl, sidx
    unless retried_request :post, "#{durl}/#{didx}", MultiJson.dump(settings[sidx])
      puts 'FAILED!'
      exit 1
    else
      puts 'OK.'
    end
    unless mappings = retried_request(:get, "#{surl}/#{sidx}/_mapping")
      warn "Failed to obtain original index '#{surl}/#{sidx}' mappings!"
      exit 1
    end
    mappings = MultiJson.load mappings
    mappings = mappings[sidx]
    mappings = mappings['mappings'] if mappings.is_a?(Hash) && mappings.has_key?('mappings')
    mappings.each_pair do |type, mapping|
      printf 'Copying mapping \'%s/%s/%s\'... ', durl, didx, type
      unless retried_request(:put, "#{durl}/#{didx}/#{type}/_mapping", MultiJson.dump(type => mapping))
        puts 'FAILED!'
        exit 1
      else
        puts 'OK.'
      end
    end
  end

  printf "Copying '%s/%s' to '%s/%s'... \n", surl, sidx, durl, didx
  @start_time = Time.now
  shards = retried_request :get, "#{surl}/#{sidx}/_count?q=*"
  shards = MultiJson.load(shards)['_shards']['total'].to_i
  scan = retried_request :get, "#{surl}/#{sidx}/_search?search_type=scan&scroll=10m&size=#{frame / shards}"
  scan = MultiJson.load scan
  scroll_id = scan['_scroll_id']
  total = scan['hits']['total']
  printf "    %u/%u (%.1f%%) done.\r", done, total, 0

  bulk_op = update? ? 'index' : 'create'

  while true do
    data = retried_request :get, "#{surl}/_search/scroll?scroll=10m&scroll_id=#{scroll_id}"
    data = MultiJson.load data
    break if data['hits']['hits'].empty?
    scroll_id = data['_scroll_id']
    bulk = ''
    data['hits']['hits'].each do |doc|
      ### === implement possible modifications to the document
      ### === end modifications to the document
      base = {'_index' => didx, '_id' => doc['_id'], '_type' => doc['_type']}
      ['_timestamp', '_ttl'].each{|doc_arg|
        base[doc_arg] = doc[doc_arg] if doc.key? doc_arg
      }
      bulk << MultiJson.dump(bulk_op => base) + "\n"
      bulk << MultiJson.dump(doc['_source'])  + "\n"
      @done = done + 1
    end
    unless bulk.empty?
      bulk << "\n" # empty line in the end required
      retried_request :post, "#{durl}/_bulk", bulk
    end

    eta = total * (Time.now - start_time) / done
    printf "    %u/%u (%.1f%%) done in %s, E.T.A.: %s.\r",
      done, total, 100.0 * done / total, tm_len, start_time + eta
  end

  printf "#{' ' * 80}\r    %u/%u done in %s.\n", done, total, tm_len

  # no point for large reindexation with data still being stored in index
  printf 'Checking document count... '
  scount, dcount = 1, 0
  begin
    Timeout::timeout(60) do
      while true
        scount = retried_request :get, "#{surl}/#{sidx}/_count?q=*"
        dcount = retried_request :get, "#{durl}/#{didx}/_count?q=*"
        scount = MultiJson.load(scount)['count'].to_i
        dcount = MultiJson.load(dcount)['count'].to_i
        break if scount == dcount
        sleep 1
      end
    end
  rescue Timeout::Error
  end
  printf "%u == %u (%s\n", scount, dcount, scount == dcount ? 'equals).' : 'NOT EQUAL)!'

  exit 0
end