Class: LogStash::Outputs::Kusto::Ingestor
- Inherits:
-
Object
- Object
- LogStash::Outputs::Kusto::Ingestor
- Defined in:
- lib/logstash/outputs/kusto/ingestor.rb
Overview
This handles the overall logic and communication with Kusto
Constant Summary collapse
- RETRY_DELAY_SECONDS =
3
- DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: 8, max_queue: 1, fallback_policy: :caller_runs )
- LOW_QUEUE_LENGTH =
3
- FIELD_REF =
/%\{[^}]+\}/
Instance Method Summary collapse
-
#initialize(ingest_url, app_id, app_key, app_tenant, database, table, mapping, delete_local, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Ingestor
constructor
A new instance of Ingestor.
- #stop ⇒ Object
- #upload(path, delete_on_success) ⇒ Object
- #upload_async(path, delete_on_success) ⇒ Object
- #validate_config(database, table, mapping) ⇒ Object
Constructor Details
#initialize(ingest_url, app_id, app_key, app_tenant, database, table, mapping, delete_local, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Ingestor
Returns a new instance of Ingestor.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 24 def initialize(ingest_url, app_id, app_key, app_tenant, database, table, mapping, delete_local, logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger validate_config(database, table, mapping) @logger.debug('Preparing Kusto resources.') kusto_connection_string = Java::com.microsoft.azure.kusto.data.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) @kusto_client = Java::com.microsoft.azure.kusto.ingest.IngestClientFactory.createClient(kusto_connection_string) @ingestion_properties = Java::com.microsoft.azure.kusto.ingest.IngestionProperties.new(database, table) @ingestion_properties.setJsonMappingName(mapping) @delete_local = delete_local @logger.debug('Kusto resources are ready.') end |
Instance Method Details
#stop ⇒ Object
111 112 113 114 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 111 def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done end |
#upload(path, delete_on_success) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 75 def upload(path, delete_on_success) file_size = File.size(path) @logger.debug("Sending file to kusto: #{path}. size: #{file_size}") # TODO: dynamic routing # file_metadata = path.partition('.kusto.').last # file_metadata_parts = file_metadata.split('.') # if file_metadata_parts.length == 3 # # this is the number we expect - database, table, mapping # database = file_metadata_parts[0] # table = file_metadata_parts[1] # mapping = file_metadata_parts[2] # local_ingestion_properties = Java::KustoIngestionProperties.new(database, table) # local_ingestion_properties.addJsonMappingName(mapping) # end file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) File.delete(path) if delete_on_success @logger.debug("File #{path} sent to kusto.") rescue Java::JavaNioFile::NoSuchFileException => e @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e., path: path, backtrace: e.backtrace) rescue => e # When the retry limit is reached or another error happen we will wait and retry. # # Thread might be stuck here, but I think its better than losing anything # its either a transient errors or something bad really happened. @logger.error('Uploading failed, retrying.', exception: e.class, message: e., path: path, backtrace: e.backtrace) sleep RETRY_DELAY_SECONDS retry end |
#upload_async(path, delete_on_success) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 61 def upload_async(path, delete_on_success) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") end @workers_pool.post do LogStash::Util.set_thread_name("Kusto to ingest file: #{path}") upload(path, delete_on_success) end rescue Exception => e @logger.error('StandardError.', exception: e.class, message: e., path: path, backtrace: e.backtrace) raise e end |
#validate_config(database, table, mapping) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 44 def validate_config(database, table, mapping) if database =~ FIELD_REF @logger.error('database config value should not be dynamic.', database) raise LogStash::ConfigurationError.new('database config value should not be dynamic.') end if table =~ FIELD_REF @logger.error('table config value should not be dynamic.', table) raise LogStash::ConfigurationError.new('table config value should not be dynamic.') end if mapping =~ FIELD_REF @logger.error('mapping config value should not be dynamic.', mapping) raise LogStash::ConfigurationError.new('mapping config value should not be dynamic.') end end |