Class: LogStash::Outputs::Kusto::Ingestor
- Inherits:
-
Object
- Object
- LogStash::Outputs::Kusto::Ingestor
- Defined in:
- lib/logstash/outputs/kusto/ingestor.rb
Overview
This handles the overall logic and communication with Kusto
Constant Summary collapse
- RETRY_DELAY_SECONDS =
3
- DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: 8, max_queue: 1, fallback_policy: :caller_runs )
- LOW_QUEUE_LENGTH =
3
- FIELD_REF =
/%\{[^}]+\}/
Instance Method Summary collapse
-
#initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port, proxy_protocol, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Ingestor
constructor
A new instance of Ingestor.
- #stop ⇒ Object
- #upload(path, delete_on_success) ⇒ Object
- #upload_async(path, delete_on_success) ⇒ Object
- #validate_config(database, table, json_mapping, proxy_protocol) ⇒ Object
Constructor Details
#initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port, proxy_protocol, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Ingestor
Returns a new instance of Ingestor.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 23 def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger validate_config(database, table, json_mapping,proxy_protocol) @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) # @logger.debug(Gem.loaded_specs.to_s) # Unfortunately there's no way to avoid using the gem/plugin name directly... name_for_tracing = "logstash-output-kusto:#{Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"}" @logger.debug("Client name for tracing: #{name_for_tracing}") tuple_utils = Java::org.apache.commons.lang3.tuple # kusto_connection_string.setClientVersionForTracing(name_for_tracing) version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin if proxy_host.nil? || proxy_host.empty? kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build() kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties) end end @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table) @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) @delete_local = delete_local @logger.debug('Kusto resources are ready.') end |
Instance Method Details
#stop ⇒ Object
137 138 139 140 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 137 def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done end |
#upload(path, delete_on_success) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 97 def upload(path, delete_on_success) file_size = File.size(path) @logger.debug("Sending file to kusto: #{path}. size: #{file_size}") # TODO: dynamic routing # file_metadata = path.partition('.kusto.').last # file_metadata_parts = file_metadata.split('.') # if file_metadata_parts.length == 3 # # this is the number we expect - database, table, json_mapping # database = file_metadata_parts[0] # table = file_metadata_parts[1] # json_mapping = file_metadata_parts[2] # local_ingestion_properties = Java::KustoIngestionProperties.new(database, table) # local_ingestion_properties.addJsonMappingName(json_mapping) # end if file_size > 0 file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) else @logger.warn("File #{path} is an empty file and is not ingested.") end File.delete(path) if delete_on_success @logger.debug("File #{path} sent to kusto.") rescue Errno::ENOENT => e @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e., path: path, backtrace: e.backtrace) rescue Java::JavaNioFile::NoSuchFileException => e @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e., path: path, backtrace: e.backtrace) rescue => e # When the retry limit is reached or another error happen we will wait and retry. # # Thread might be stuck here, but I think its better than losing anything # its either a transient errors or something bad really happened. @logger.error('Uploading failed, retrying.', exception: e.class, message: e., path: path, backtrace: e.backtrace) sleep RETRY_DELAY_SECONDS retry end |
#upload_async(path, delete_on_success) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 83 def upload_async(path, delete_on_success) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") end @workers_pool.post do LogStash::Util.set_thread_name("Kusto to ingest file: #{path}") upload(path, delete_on_success) end rescue Exception => e @logger.error('StandardError.', exception: e.class, message: e., path: path, backtrace: e.backtrace) raise e end |
#validate_config(database, table, json_mapping, proxy_protocol) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 60 def validate_config(database, table, json_mapping,proxy_protocol) if database =~ FIELD_REF @logger.error('database config value should not be dynamic.', database) raise LogStash::ConfigurationError.new('database config value should not be dynamic.') end if table =~ FIELD_REF @logger.error('table config value should not be dynamic.', table) raise LogStash::ConfigurationError.new('table config value should not be dynamic.') end if json_mapping =~ FIELD_REF @logger.error('json_mapping config value should not be dynamic.', json_mapping) raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') end if not(["https", "http"].include? proxy_protocol) @logger.error('proxy_protocol has to be http or https.', proxy_protocol) raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') end end |