Class: LogStash::Inputs::AzureEventHubs

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/azure_event_hubs.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ AzureEventHubs



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 296

def initialize(params)

  # explode the all of the parameters to be scoped per event_hub
  @event_hubs_exploded = []
  # global_config will be merged into the each of the exploded configs, prefer any configuration already scoped over the globally scoped config
  global_config = {}
  params.each do |k, v|
    if !k.eql?('id') && !k.eql?('event_hubs') && !k.eql?('threads') && !k.eql?('event_hub_connections')  # don't copy these to the per-event-hub configs
      global_config[k] = v
    end
  end

  if params['config_mode'] && params['config_mode'].eql?('advanced')
    params['event_hub_connections'] = ['dummy'] # trick the :required validation

    params['event_hubs'].each do |event_hub|
      raise "event_hubs must be a Hash" unless event_hub.is_a?(Hash)
      event_hub.each do |event_hub_name, config|
        config.each do |k, v|
          if 'event_hub_connection'.eql?(k) || 'storage_connection'.eql?(k) # protect from leaking logs
            config[k] = ::LogStash::Util::Password.new(v)
          end
        end
        if config['event_hub_connection'] #add the 's' to pass validation
          config['event_hub_connections'] = config['event_hub_connection']
          config.delete('event_hub_connection')
        end

        config.merge!({'event_hubs' => [event_hub_name]})
        config.merge!(global_config) {|k, v1, v2| v1}
        @event_hubs_exploded << config
      end
    end
  else # basic config
    params['event_hubs'] = ['dummy'] # trick the :required validation
    if params['event_hub_connections']
      connections = *params['event_hub_connections'] # ensure array
      connections.each.with_index do |_connection, i|
        begin
          connection = replace_connection_placeholders(_connection)
          event_hub_name = ConnectionStringBuilder.new(connection).getEventHubName
          redacted_connection = connection.gsub(/(SharedAccessKey=)([0-9a-zA-Z=+]*)([;]*)(.*)/, '\\1<redacted>\\3\\4')
          params['event_hub_connections'][i] = redacted_connection # protect from leaking logs
          raise "invalid Event Hub name" unless event_hub_name
        rescue
          raise LogStash::ConfigurationError, "Error parsing event hub string name for connection: '#{redacted_connection}' please ensure that the connection string contains the EntityPath"
        end
        @event_hubs_exploded << {'event_hubs' => [event_hub_name]}.merge({'event_hub_connections' => [::LogStash::Util::Password.new(connection)]}).merge(global_config) {|k, v1, v2| v1}
      end
    end
  end

  super(params)

  container_consumer_groups = []
  # explicitly validate all the per event hub configs
  @event_hubs_exploded.each do |event_hub|
    if !self.class.validate(event_hub)
      raise LogStash::ConfigurationError, I18n.t("logstash.runner.configuration.invalid_plugin_settings")
    end
    container_consumer_groups << {event_hub['storage_connection'].value.to_s + (event_hub['storage_container'] ? event_hub['storage_container'] : event_hub['event_hubs'][0]) => event_hub['consumer_group']} if event_hub['storage_connection']
  end
  raise "The configuration will result in overwriting offsets. Please ensure that the each Event Hub's consumer_group is using a unique storage container." if container_consumer_groups.size > container_consumer_groups.uniq.size
end

Instance Attribute Details

#countObject (readonly)

Returns the value of attribute count.



294
295
296
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 294

def count
  @count
end

#event_hubs_explodedObject (readonly)

Returns the value of attribute event_hubs_exploded.



361
362
363
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 361

def event_hubs_exploded
  @event_hubs_exploded
end

Instance Method Details

#create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service) ⇒ Object



490
491
492
493
494
495
496
497
498
499
500
501
502
503
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 490

def create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
  checkpoint_manager = InMemoryCheckpointManager.new
  lease_manager = InMemoryLeaseManager.new
  event_processor_host = EventProcessorHost::EventProcessorHostBuilder.newBuilder(EventProcessorHost.createHostName('logstash'), event_hub['consumer_group'])
                                           .useUserCheckpointAndLeaseManagers(checkpoint_manager, lease_manager)
                                           .useEventHubConnectionString(event_hub['event_hub_connections'].first.value) #there will only be one in this array by the time it gets here
                                           .setExecutor(scheduled_executor_service)
                                           .build
  host_context = get_host_context(event_processor_host)
  #using java_send to avoid naming conflicts with 'initialize' method
  lease_manager.java_send :initialize, [HostContext], host_context
  checkpoint_manager.java_send :initialize, [HostContext], host_context
  event_processor_host
end

#registerObject



382
383
384
385
386
387
388
389
390
391
392
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 382

def register
  # augment the exploded config with the defaults
  @event_hubs_exploded.each do |event_hub|
    @config.each do |key, value|
      if !key.eql?('id') && !key.eql?('event_hubs')
        event_hub[key] = value unless event_hub[key]
      end
    end
  end
  @logger.debug("Exploded Event Hub configuration.",  :event_hubs_exploded => @event_hubs_exploded.to_s)
end

#replace_connection_placeholders(value) ⇒ Object



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 363

def replace_connection_placeholders(value)
  connection = value
  if self.class.respond_to? :replace_placeholders
    # Logstash 6.x
    if self.class.method(:replace_placeholders).arity == 1
      connection = self.class.replace_placeholders(connection)
    else
      # Logstash 8.15.1 and 8.15.2 changed the method arity including a new boolean parameter `refine`.
      # This was fixed in 8.15.3. see https://github.com/elastic/logstash/pull/16485
      connection = self.class.replace_placeholders(connection, false)
    end
  end

  # Logstash 5.x
  connection = self.class.replace_env_placeholders(connection) if self.class.respond_to? :replace_env_placeholders

  connection
end

#run(queue) ⇒ Object



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 394

def run(queue)
  event_hub_threads = []
  named_thread_factory = LogStash::Inputs::Azure::NamedThreadFactory.new("azure_event_hubs-worker", @id)
  scheduled_executor_service = Executors.newScheduledThreadPool(@threads, named_thread_factory)
  @event_hubs_exploded.each do |event_hub|
    event_hub_threads << Thread.new do
      event_hub_name = event_hub['event_hubs'].first # there will always only be 1 from @event_hubs_exploded
      @logger.info("Event Hub #{event_hub_name} is initializing... ")
      begin
        if event_hub['storage_connection']
          event_processor_host = EventProcessorHost::EventProcessorHostBuilder.newBuilder(EventProcessorHost.createHostName('logstash'), event_hub['consumer_group'])
                                                       .useAzureStorageCheckpointLeaseManager(
                                                         event_hub['storage_connection'].value,
                                                         event_hub.fetch('storage_container', event_hub_name),
                                                         nil
                                                       )
                                                       .useEventHubConnectionString(
                                                         event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
                                                       )
                                                   .setExecutor(scheduled_executor_service)
                                                   .build
        else
          @logger.warn("You have NOT specified a `storage_connection_string` for #{event_hub_name}. This configuration is only supported for a single Logstash instance.")
          event_processor_host = create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
        end
        options = EventProcessorOptions.new
        options.setMaxBatchSize(max_batch_size)
        options.setPrefetchCount(prefetch_count)
        options.setReceiveTimeOut(Duration.ofSeconds(receive_timeout))
        
        options.setExceptionNotification(LogStash::Inputs::Azure::ErrorNotificationHandler.new)
        case @initial_position
        when 'beginning'
          msg = "Configuring Event Hub #{event_hub_name} to read events all events."
          @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
          @logger.info(msg) unless event_hub['storage_connection']
          options.setInitialPositionProvider(EventProcessorOptions::StartOfStreamInitialPositionProvider.new(options))
        when 'end'
          msg = "Configuring Event Hub #{event_hub_name} to read only new events."
          @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
          @logger.info(msg) unless event_hub['storage_connection']
          options.setInitialPositionProvider(EventProcessorOptions::EndOfStreamInitialPositionProvider.new(options))
        when 'look_back'
          msg = "Configuring Event Hub #{event_hub_name} to read events starting at 'now - #{@initial_position_look_back}' seconds."
          @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
          @logger.info(msg) unless event_hub['storage_connection']
          options.setInitialPositionProvider(LogStash::Inputs::Azure::LookBackPositionProvider.new(@initial_position_look_back))
        end
        event_processor_host.registerEventProcessorFactory(LogStash::Inputs::Azure::ProcessorFactory.new(queue, event_hub['codec'], event_hub['checkpoint_interval'], self.method(:decorate), event_hub['decorate_events']), options)
            .when_complete(lambda {|x, e|
              @logger.info("Event Hub registration complete. ", :event_hub_name => event_hub_name )
              @logger.error("Event Hub failure while registering.", :event_hub_name => event_hub_name, :exception => e, :backtrace => e.backtrace) if e
            })
            .then_accept(lambda {|x|
              @logger.info("Event Hub is processing events... ", :event_hub_name => event_hub_name )
              # this blocks the completable future chain from progressing, actual work is done via the executor service
              while !stop?
                Stud.stoppable_sleep(1) {stop?}
              end
            })
            .then_compose(lambda {|x|
              @logger.info("Unregistering Event Hub this can take a while... ", :event_hub_name => event_hub_name )
              event_processor_host.unregisterEventProcessor
            })
            .exceptionally(lambda {|e|
              @logger.error("Event Hub encountered an error.", :event_hub_name => event_hub_name , :exception => e, :backtrace => e.backtrace) if e
              nil
            })
            .get # this blocks till all of the futures are complete.
        @logger.info("Event Hub #{event_hub_name} is closed.")
      rescue => e
        @logger.error("Event Hub failed during initialization.", :event_hub_name => event_hub_name, :exception => e, :backtrace => e.backtrace) if e
        do_stop
      end
    end
  end

  # this blocks the input from existing. (all work is being done in threads)
  while !stop?
    Stud.stoppable_sleep(1) {stop?}
  end

  # This blocks the input till all the threads have run to completion.
  event_hub_threads.each do |thread|
    thread.join
  end

  # Ensure proper shutdown of executor service. # Note - this causes a harmless warning in the logs that scheduled tasks are being rejected.
  scheduled_executor_service.shutdown
  begin
    scheduled_executor_service.awaitTermination(10, TimeUnit::MINUTES);
  rescue => e
    @logger.debug("interrupted while waiting to close executor service, this can generally be ignored", :exception => e, :backtrace => e.backtrace) if e
  end
end