Class: Forklift::Connection::Elasticsearch

Inherits:
Base::Connection show all
Defined in:
lib/forklift/transports/elasticsearch.rb

Instance Method Summary collapse

Methods inherited from Base::Connection

#client, #exec, #exec!, #exec_ruby, #exec_script, #pipe

Constructor Details

#initialize(config, forklift) ⇒ Elasticsearch

Returns a new instance of Elasticsearch.



7
8
9
10
# File 'lib/forklift/transports/elasticsearch.rb', line 7

def initialize(config, forklift)
  @config = config
  @forklift = forklift
end

Instance Method Details

#configObject



12
13
14
# File 'lib/forklift/transports/elasticsearch.rb', line 12

def config
  @config
end

#connectObject



20
21
22
# File 'lib/forklift/transports/elasticsearch.rb', line 20

def connect
  @client = ::Elasticsearch::Client.new(config)
end

#delete_index(index) ⇒ Object



75
76
77
78
# File 'lib/forklift/transports/elasticsearch.rb', line 75

def delete_index(index)
  forklift.logger.debug "    ELASTICSEARCH (delete index): #{index}"
  client.indices.delete({ index: index }) if client.indices.exists({ index: index })
end

#disconnectObject



24
25
26
# File 'lib/forklift/transports/elasticsearch.rb', line 24

def disconnect
  @client = nil
end

#forkliftObject



16
17
18
# File 'lib/forklift/transports/elasticsearch.rb', line 16

def forklift
  @forklift
end

#read(index, query, looping = true, from = 0, size = forklift.config[:batch_size]) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/forklift/transports/elasticsearch.rb', line 28

def read(index, query, looping=true, from=0, size=forklift.config[:batch_size])
  offset = 0
  loop_count = 0

  while (looping == true || loop_count == 0)
    data = []
    prepared_query = query
    prepared_query[:from] = from + offset
    prepared_query[:size] = size

    forklift.logger.debug "    ELASTICSEARCH: #{query.to_json}"
    results = client.search( { index: index, body: prepared_query } )
    results["hits"]["hits"].each do |hit|
      data << hit["_source"]
    end

    data.map{|l| l.symbolize_keys! }

    if block_given?
      yield data
    else
      return data
    end

    looping = false if results["hits"]["hits"].length == 0
    offset = offset + size
    loop_count = loop_count + 1
  end
end

#write(data, index, update = false, type = 'forklift', primary_key = :id) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/forklift/transports/elasticsearch.rb', line 58

def write(data, index, update=false, type='forklift', primary_key=:id)
  data.map{|l| l.symbolize_keys! }

  data.each do |d|
    object = {
      index:  index,
      body:   d,
      type:   type,
    }
    object[:id] = d[primary_key] if ( !d[primary_key].nil? && update == true )

    forklift.logger.debug "    ELASTICSEARCH (store): #{object.to_json}"
    client.index object
  end
  client.indices.refresh({ index: index })
end