Class: Core::Ingestion::EsSink

Inherits:
Object
  • Object
show all
Defined in:
lib/core/ingestion/es_sink.rb

Instance Method Summary collapse

Constructor Details

#initialize(index_name, request_pipeline, bulk_queue = Utility::BulkQueue.new, max_allowed_document_size = Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_BYTES) ⇒ EsSink

Returns a new instance of EsSink.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/core/ingestion/es_sink.rb', line 24

def initialize(index_name, request_pipeline, bulk_queue = Utility::BulkQueue.new, max_allowed_document_size = Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_BYTES)
  @client = Utility::EsClient.new(App::Config[:elasticsearch])
  @index_name = index_name
  @request_pipeline = request_pipeline
  @operation_queue = bulk_queue

  @max_allowed_document_size = max_allowed_document_size

  @queued = {
    :indexed_document_count => 0,
    :deleted_document_count => 0,
    :indexed_document_volume => 0
  }

  @completed = {
    :indexed_document_count => 0,
    :deleted_document_count => 0,
    :indexed_document_volume => 0
  }
end

Instance Method Details

#delete(id) ⇒ Object



78
79
80
81
82
83
84
85
86
# File 'lib/core/ingestion/es_sink.rb', line 78

def delete(id)
  return if id.nil?

  delete_op = serialize({ 'delete' => { '_index' => @index_name, '_id' => id } })
  flush unless @operation_queue.will_fit?(delete_op)

  @operation_queue.add(delete_op)
  @queued[:deleted_document_count] += 1
end

#delete_multiple(ids) ⇒ Object



88
89
90
# File 'lib/core/ingestion/es_sink.rb', line 88

def delete_multiple(ids)
  ids.each { |id| delete(id) }
end

#flushObject



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/core/ingestion/es_sink.rb', line 92

def flush
  data = @operation_queue.pop_all
  return if data.empty?

  @client.bulk(:body => data, :pipeline => @request_pipeline)

  @completed[:indexed_document_count] += @queued[:indexed_document_count]
  @completed[:deleted_document_count] += @queued[:deleted_document_count]
  @completed[:indexed_document_volume] += @queued[:indexed_document_volume]

  @queued[:indexed_document_count] = 0
  @queued[:deleted_document_count] = 0
  @queued[:indexed_document_volume] = 0
end

#ingest(document) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/core/ingestion/es_sink.rb', line 45

def ingest(document)
  if document.nil? || document.empty?
    Utility::Logger.warn('Connector attempted to ingest an empty document, skipping')
    return
  end

  id = document['id']
  serialized_document = serialize(document)

  document_size = serialized_document.bytesize

  if @max_allowed_document_size > 0 && document_size > @max_allowed_document_size
    Utility::Logger.warn("Connector attempted to ingest too large document with id=#{document['id']} [#{document_size}/#{@max_allowed_document_size}], skipping the document.")
    return
  end

  index_op = serialize({ 'index' => { '_index' => @index_name, '_id' => id } })

  flush unless @operation_queue.will_fit?(index_op, serialized_document)

  @operation_queue.add(
    index_op,
    serialized_document
  )

  @queued[:indexed_document_count] += 1
  @queued[:indexed_document_volume] += document_size
end

#ingest_multiple(documents) ⇒ Object



74
75
76
# File 'lib/core/ingestion/es_sink.rb', line 74

def ingest_multiple(documents)
  documents.each { |doc| ingest(doc) }
end

#ingestion_statsObject



107
108
109
# File 'lib/core/ingestion/es_sink.rb', line 107

def ingestion_stats
  @completed.dup
end