Class: LogStash::Outputs::Kusto::Ingestor
- Inherits:
-
Object
- Object
- LogStash::Outputs::Kusto::Ingestor
- Defined in:
- lib/logstash/outputs/kusto/ingestor.rb
Overview
This handles the overall logic and communication with Kusto
Constant Summary collapse
- RETRY_DELAY_SECONDS =
3- DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: 8, max_queue: 1, fallback_policy: :caller_runs )
- LOW_QUEUE_LENGTH =
3- FIELD_REF =
/%\{[^}]+\}/
Instance Method Summary collapse
-
#initialize(ingest_url, app_id, app_key, app_tenant, database, table, mapping, delete_local, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Ingestor
constructor
A new instance of Ingestor.
- #stop ⇒ Object
- #upload(path, delete_on_success) ⇒ Object
- #upload_async(path, delete_on_success) ⇒ Object
- #validate_config(database, table, mapping) ⇒ Object
Constructor Details
#initialize(ingest_url, app_id, app_key, app_tenant, database, table, mapping, delete_local, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Ingestor
Returns a new instance of Ingestor.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 24 def initialize(ingest_url, app_id, app_key, app_tenant, database, table, mapping, delete_local, logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger validate_config(database, table, mapping) @logger.debug('Preparing Kusto resources.') kusto_connection_string = Java::com.microsoft.azure.kusto.data.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) # Unfortunately there's no way to avoid using the gem/plugin name directly... name_for_tracing = "logstash-output-kusto:#{Gem.loaded_specs['logstash-output-kusto'].version}" @logger.debug("Client name for tracing: #{name_for_tracing}") kusto_connection_string.setClientVersionForTracing(name_for_tracing) @kusto_client = Java::com.microsoft.azure.kusto.ingest.IngestClientFactory.createClient(kusto_connection_string) @ingestion_properties = Java::com.microsoft.azure.kusto.ingest.IngestionProperties.new(database, table) @ingestion_properties.setJsonMappingName(mapping) @delete_local = delete_local @logger.debug('Kusto resources are ready.') end |
Instance Method Details
#stop ⇒ Object
118 119 120 121 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 118 def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done end |
#upload(path, delete_on_success) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 80 def upload(path, delete_on_success) file_size = File.size(path) @logger.debug("Sending file to kusto: #{path}. size: #{file_size}") # TODO: dynamic routing # file_metadata = path.partition('.kusto.').last # file_metadata_parts = file_metadata.split('.') # if file_metadata_parts.length == 3 # # this is the number we expect - database, table, mapping # database = file_metadata_parts[0] # table = file_metadata_parts[1] # mapping = file_metadata_parts[2] # local_ingestion_properties = Java::KustoIngestionProperties.new(database, table) # local_ingestion_properties.addJsonMappingName(mapping) # end file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) File.delete(path) if delete_on_success @logger.debug("File #{path} sent to kusto.") rescue Errno::ENOENT => e @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e., path: path, backtrace: e.backtrace) rescue Java::JavaNioFile::NoSuchFileException => e @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e., path: path, backtrace: e.backtrace) rescue => e # When the retry limit is reached or another error happen we will wait and retry. # # Thread might be stuck here, but I think its better than losing anything # its either a transient errors or something bad really happened. @logger.error('Uploading failed, retrying.', exception: e.class, message: e., path: path, backtrace: e.backtrace) sleep RETRY_DELAY_SECONDS retry end |
#upload_async(path, delete_on_success) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 66 def upload_async(path, delete_on_success) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") end @workers_pool.post do LogStash::Util.set_thread_name("Kusto to ingest file: #{path}") upload(path, delete_on_success) end rescue Exception => e @logger.error('StandardError.', exception: e.class, message: e., path: path, backtrace: e.backtrace) raise e end |
#validate_config(database, table, mapping) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 49 def validate_config(database, table, mapping) if database =~ FIELD_REF @logger.error('database config value should not be dynamic.', database) raise LogStash::ConfigurationError.new('database config value should not be dynamic.') end if table =~ FIELD_REF @logger.error('table config value should not be dynamic.', table) raise LogStash::ConfigurationError.new('table config value should not be dynamic.') end if mapping =~ FIELD_REF @logger.error('mapping config value should not be dynamic.', mapping) raise LogStash::ConfigurationError.new('mapping config value should not be dynamic.') end end |