Class: LogStash::Outputs::Kusto::Ingestor

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/kusto/ingestor.rb

Overview

This handles the overall logic and communication with Kusto

Constant Summary collapse

RETRY_DELAY_SECONDS =
3
DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new(
  min_threads: 1,
  max_threads: 8,
  max_queue: 1,
  fallback_policy: :caller_runs
)
LOW_QUEUE_LENGTH =
3
FIELD_REF =
/%\{[^}]+\}/

Instance Method Summary collapse

Constructor Details

#initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port, proxy_protocol, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Ingestor

Returns a new instance of Ingestor.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 23

def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
  @workers_pool = threadpool
  @logger = logger
  validate_config(database, table, json_mapping,proxy_protocol)
  @logger.info('Preparing Kusto resources.')

  kusto_java = Java::com.microsoft.azure.kusto
  apache_http = Java::org.apache.http
  kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
  #
  @logger.debug(Gem.loaded_specs.to_s)
  # Unfortunately there's no way to avoid using the gem/plugin name directly...
  name_for_tracing = "logstash-output-kusto:#{Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"}"
  @logger.debug("Client name for tracing: #{name_for_tracing}")

  tuple_utils = Java::org.apache.commons.lang3.tuple
  # kusto_connection_string.setClientVersionForTracing(name_for_tracing)
  version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"
  kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray());
  
  @kusto_client = begin
    if proxy_host.nil? || proxy_host.empty?
      kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
    else
      kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build()
      kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties)
    end
  end

  @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
  @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
  @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
  @delete_local = delete_local

  @logger.debug('Kusto resources are ready.')
end

Instance Method Details

#stopObject



137
138
139
140
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 137

def stop
  @workers_pool.shutdown
  @workers_pool.wait_for_termination(nil) # block until its done
end

#upload(path, delete_on_success) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 97

def upload(path, delete_on_success)
  file_size = File.size(path)
  @logger.debug("Sending file to kusto: #{path}. size: #{file_size}")

  # TODO: dynamic routing
  # file_metadata = path.partition('.kusto.').last
  # file_metadata_parts = file_metadata.split('.')

  # if file_metadata_parts.length == 3
  #   # this is the number we expect - database, table, json_mapping
  #   database = file_metadata_parts[0]
  #   table = file_metadata_parts[1]
  #   json_mapping = file_metadata_parts[2]

  #   local_ingestion_properties = Java::KustoIngestionProperties.new(database, table)
  #   local_ingestion_properties.addJsonMappingName(json_mapping)
  # end

  if file_size > 0
    file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file
    @kusto_client.ingestFromFile(file_source_info, @ingestion_properties)
  else
    @logger.warn("File #{path} is an empty file and is not ingested.")
  end
  File.delete(path) if delete_on_success
  @logger.debug("File #{path} sent to kusto.")
rescue Errno::ENOENT => e
  @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
rescue Java::JavaNioFile::NoSuchFileException => e
  @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
rescue => e
  # When the retry limit is reached or another error happen we will wait and retry.
  #
  # Thread might be stuck here, but I think its better than losing anything
  # its either a transient errors or something bad really happened.
  @logger.error('Uploading failed, retrying.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
  sleep RETRY_DELAY_SECONDS
  retry
end

#upload_async(path, delete_on_success) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 83

def upload_async(path, delete_on_success)
  if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH
    @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.")
  end

  @workers_pool.post do
    LogStash::Util.set_thread_name("Kusto to ingest file: #{path}")
    upload(path, delete_on_success)
  end
rescue Exception => e
  @logger.error('StandardError.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
  raise e
end

#validate_config(database, table, json_mapping, proxy_protocol) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 60

def validate_config(database, table, json_mapping,proxy_protocol)
  if database =~ FIELD_REF
    @logger.error('database config value should not be dynamic.', database)
    raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
  end

  if table =~ FIELD_REF
    @logger.error('table config value should not be dynamic.', table)
    raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
  end

  if json_mapping =~ FIELD_REF
    @logger.error('json_mapping config value should not be dynamic.', json_mapping)
    raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
  end

  if not(["https", "http"].include? proxy_protocol)
    @logger.error('proxy_protocol has to be http or https.', proxy_protocol)
    raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
  end

end