Class: Core::OutputSink::EsSink

Inherits:
BaseSink
  • Object
show all
Defined in:
lib/core/output_sink/es_sink.rb

Instance Method Summary collapse

Constructor Details

#initialize(index_name, request_pipeline, flush_threshold = 50) ⇒ EsSink

Returns a new instance of EsSink.



17
18
19
20
21
22
23
24
# File 'lib/core/output_sink/es_sink.rb', line 17

def initialize(index_name, request_pipeline, flush_threshold = 50)
  super()
  @client = Utility::EsClient.new(App::Config[:elasticsearch])
  @index_name = index_name
  @request_pipeline = request_pipeline
  @operation_queue = []
  @flush_threshold = flush_threshold
end

Instance Method Details

#delete(doc_id) ⇒ Object



33
34
35
36
37
38
# File 'lib/core/output_sink/es_sink.rb', line 33

def delete(doc_id)
  return if doc_id.nil?

  @operation_queue << { :delete => { :_index => index_name, :_id => doc_id } }
  flush if ready_to_flush?
end

#delete_multiple(ids) ⇒ Object



54
55
56
57
# File 'lib/core/output_sink/es_sink.rb', line 54

def delete_multiple(ids)
  Utility::Logger.debug "Enqueueing #{ids&.size} ids to delete from the index #{index_name}."
  ids.each { |id| delete(id) }
end

#flush(size: nil) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/core/output_sink/es_sink.rb', line 40

def flush(size: nil)
  flush_size = size || @flush_threshold

  while @operation_queue.any?
    data_to_flush = @operation_queue.pop(flush_size)
    send_data(data_to_flush)
  end
end

#ingest(document) ⇒ Object



26
27
28
29
30
31
# File 'lib/core/output_sink/es_sink.rb', line 26

def ingest(document)
  return if document.blank?

  @operation_queue << { :index => { :_index => index_name, :_id => document[:id], :data => document } }
  flush if ready_to_flush?
end

#ingest_multiple(documents) ⇒ Object



49
50
51
52
# File 'lib/core/output_sink/es_sink.rb', line 49

def ingest_multiple(documents)
  Utility::Logger.debug "Enqueueing #{documents&.size} documents to the index #{index_name}."
  documents.each { |doc| ingest(doc) }
end