Class: LogStash::Outputs::Kusto::Ingestor

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/kusto/ingestor.rb

Overview

This handles the overall logic and communication with Kusto

Constant Summary collapse

RETRY_DELAY_SECONDS =
3
DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new(
  min_threads: 1,
  max_threads: 8,
  max_queue: 1,
  fallback_policy: :caller_runs
)
LOW_QUEUE_LENGTH =
3
FIELD_REF =
/%\{[^}]+\}/

Instance Method Summary collapse

Constructor Details

#initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host, proxy_port, proxy_protocol, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Ingestor

Returns a new instance of Ingestor.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 23

def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
  @workers_pool = threadpool
  @logger = logger
  validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth)
  @logger.info('Preparing Kusto resources.')

  kusto_java = Java::com.microsoft.azure.kusto
  apache_http = Java::org.apache.http
  # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
  # If there is managed identity, use it. This means the AppId and AppKey are empty/nil
  # If there is CLI Auth, use that instead of managed identity
  is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth)
  # If it is system managed identity, propagate the system identity
  is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
  # Is it direct connection
  is_direct_conn = (proxy_host.nil? || proxy_host.empty?)
  # Create a connection string
  kusto_connection_string = if is_managed_identity
      if is_system_assigned_managed_identity
        @logger.info('Using system managed identity.')
        kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url)  
      else
        @logger.info('Using user managed identity.')
        kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
      end
    else
      if cli_auth
        @logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*')
        kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url)
      else 
        @logger.info('Using app id and app key.')
        kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
      end
    end
  #
  @logger.debug(Gem.loaded_specs.to_s)
  # Unfortunately there's no way to avoid using the gem/plugin name directly...
  name_for_tracing = "logstash-output-kusto:#{Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"}"
  @logger.debug("Client name for tracing: #{name_for_tracing}")

  java_util = Java::java.util
  # kusto_connection_string.setClientVersionForTracing(name_for_tracing)
  version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"
  kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", java_util.Collections.emptyMap());
  @kusto_client = begin
    if is_direct_conn
      kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
    else
      http_kusto = Java::com.microsoft.azure.kusto.data.http
      java_net = Java::java.net
      proxy_inet_server = java_net.InetSocketAddress.new(proxy_host, proxy_port)
      proxy = Java::com.azure.core.http.ProxyOptions.new(Java::com.azure.core.http.ProxyOptions::Type::HTTP, proxy_inet_server)
      http_client_properties = http_kusto.HttpClientProperties.builder().proxy(proxy).build()
      kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, http_client_properties)
    end
  end

  @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
  is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?)
  if is_mapping_ref_provided
    @logger.debug('Using mapping reference.', json_mapping)
    @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
    @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
  else
    @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
    @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
  end
  @delete_local = delete_local
  @logger.debug('Kusto resources are ready.')
end

Instance Method Details

#stopObject



180
181
182
183
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 180

def stop
  @workers_pool.shutdown
  @workers_pool.wait_for_termination(nil) # block until its done
end

#upload(path, delete_on_success) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 140

def upload(path, delete_on_success)
  file_size = File.size(path)
  @logger.debug("Sending file to kusto: #{path}. size: #{file_size}")

  # TODO: dynamic routing
  # file_metadata = path.partition('.kusto.').last
  # file_metadata_parts = file_metadata.split('.')

  # if file_metadata_parts.length == 3
  #   # this is the number we expect - database, table, json_mapping
  #   database = file_metadata_parts[0]
  #   table = file_metadata_parts[1]
  #   json_mapping = file_metadata_parts[2]

  #   local_ingestion_properties = Java::KustoIngestionProperties.new(database, table)
  #   local_ingestion_properties.addJsonMappingName(json_mapping)
  # end

  if file_size > 0
    file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path); # 0 - let the sdk figure out the size of the file
    @kusto_client.ingestFromFile(file_source_info, @ingestion_properties)
  else
    @logger.warn("File #{path} is an empty file and is not ingested.")
  end
  File.delete(path) if delete_on_success
  @logger.debug("File #{path} sent to kusto.")
rescue Errno::ENOENT => e
  @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
rescue Java::JavaNioFile::NoSuchFileException => e
  @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
rescue => e
  # When the retry limit is reached or another error happen we will wait and retry.
  #
  # Thread might be stuck here, but I think its better than losing anything
  # its either a transient errors or something bad really happened.
  @logger.error('Uploading failed, retrying.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
  sleep RETRY_DELAY_SECONDS
  retry
end

#upload_async(path, delete_on_success) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 126

def upload_async(path, delete_on_success)
  if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH
    @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.")
  end

  @workers_pool.post do
    LogStash::Util.set_thread_name("Kusto to ingest file: #{path}")
    upload(path, delete_on_success)
  end
rescue Exception => e
  @logger.error('StandardError.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
  raise e
end

#validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id, cli_auth) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/logstash/outputs/kusto/ingestor.rb', line 94

def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth)
  # Add an additional validation and fail this upfront
  if app_id.nil? && app_key.nil? && managed_identity_id.nil?
    if cli_auth
      @logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production')
    else
      @logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
      raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
    end
  end      
  if database =~ FIELD_REF
    @logger.error('database config value should not be dynamic.', database)
    raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
  end

  if table =~ FIELD_REF
    @logger.error('table config value should not be dynamic.', table)
    raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
  end

  if json_mapping =~ FIELD_REF
    @logger.error('json_mapping config value should not be dynamic.', json_mapping)
    raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
  end

  if not(["https", "http"].include? proxy_protocol)
    @logger.error('proxy_protocol has to be http or https.', proxy_protocol)
    raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
  end

end