Class: Core::Ingestion::EsSink
- Inherits:
-
Object
- Object
- Core::Ingestion::EsSink
- Defined in:
- lib/core/ingestion/es_sink.rb
Instance Method Summary collapse
- #delete(id) ⇒ Object
- #delete_multiple(ids) ⇒ Object
- #flush ⇒ Object
- #ingest(document) ⇒ Object
- #ingest_multiple(documents) ⇒ Object
- #ingestion_stats ⇒ Object
-
#initialize(index_name, request_pipeline, bulk_queue = Utility::BulkQueue.new, max_allowed_document_size = Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_BYTES) ⇒ EsSink
constructor
A new instance of EsSink.
Constructor Details
#initialize(index_name, request_pipeline, bulk_queue = Utility::BulkQueue.new, max_allowed_document_size = Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_BYTES) ⇒ EsSink
Returns a new instance of EsSink.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/core/ingestion/es_sink.rb', line 24 def initialize(index_name, request_pipeline, bulk_queue = Utility::BulkQueue.new, max_allowed_document_size = Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_BYTES) @client = Utility::EsClient.new(App::Config[:elasticsearch]) @index_name = index_name @request_pipeline = request_pipeline @operation_queue = bulk_queue @max_allowed_document_size = max_allowed_document_size @queued = { :indexed_document_count => 0, :deleted_document_count => 0, :indexed_document_volume => 0 } @completed = { :indexed_document_count => 0, :deleted_document_count => 0, :indexed_document_volume => 0 } end |
Instance Method Details
#delete(id) ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/core/ingestion/es_sink.rb', line 78 def delete(id) return if id.nil? delete_op = serialize({ 'delete' => { '_index' => @index_name, '_id' => id } }) flush unless @operation_queue.will_fit?(delete_op) @operation_queue.add(delete_op) @queued[:deleted_document_count] += 1 end |
#delete_multiple(ids) ⇒ Object
88 89 90 |
# File 'lib/core/ingestion/es_sink.rb', line 88 def delete_multiple(ids) ids.each { |id| delete(id) } end |
#flush ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/core/ingestion/es_sink.rb', line 92 def flush data = @operation_queue.pop_all return if data.empty? @client.bulk(:body => data, :pipeline => @request_pipeline) @completed[:indexed_document_count] += @queued[:indexed_document_count] @completed[:deleted_document_count] += @queued[:deleted_document_count] @completed[:indexed_document_volume] += @queued[:indexed_document_volume] @queued[:indexed_document_count] = 0 @queued[:deleted_document_count] = 0 @queued[:indexed_document_volume] = 0 end |
#ingest(document) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/core/ingestion/es_sink.rb', line 45 def ingest(document) if document.nil? || document.empty? Utility::Logger.warn('Connector attempted to ingest an empty document, skipping') return end id = document['id'] serialized_document = serialize(document) document_size = serialized_document.bytesize if @max_allowed_document_size > 0 && document_size > @max_allowed_document_size Utility::Logger.warn("Connector attempted to ingest too large document with id=#{document['id']} [#{document_size}/#{@max_allowed_document_size}], skipping the document.") return end index_op = serialize({ 'index' => { '_index' => @index_name, '_id' => id } }) flush unless @operation_queue.will_fit?(index_op, serialized_document) @operation_queue.add( index_op, serialized_document ) @queued[:indexed_document_count] += 1 @queued[:indexed_document_volume] += document_size end |
#ingest_multiple(documents) ⇒ Object
74 75 76 |
# File 'lib/core/ingestion/es_sink.rb', line 74 def ingest_multiple(documents) documents.each { |doc| ingest(doc) } end |
#ingestion_stats ⇒ Object
107 108 109 |
# File 'lib/core/ingestion/es_sink.rb', line 107 def ingestion_stats @completed.dup end |