Class: LogStash::Inputs::AzureEventHubs

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/azure_event_hubs.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ AzureEventHubs

Returns a new instance of AzureEventHubs.



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 296

def initialize(params)

  # explode the all of the parameters to be scoped per event_hub
  @event_hubs_exploded = []
  # global_config will be merged into the each of the exploded configs, prefer any configuration already scoped over the globally scoped config
  global_config = {}
  params.each do |k, v|
    if !k.eql?('id') && !k.eql?('event_hubs') && !k.eql?('threads') && !k.eql?('event_hub_connections')  # don't copy these to the per-event-hub configs
      global_config[k] = v
    end
  end

  if params['config_mode'] && params['config_mode'].eql?('advanced')
    params['event_hub_connections'] = ['dummy'] # trick the :required validation

    params['event_hubs'].each do |event_hub|
      raise "event_hubs must be a Hash" unless event_hub.is_a?(Hash)
      event_hub.each do |event_hub_name, config|
        config.each do |k, v|
          if 'event_hub_connection'.eql?(k) || 'storage_connection'.eql?(k) # protect from leaking logs
            config[k] = ::LogStash::Util::Password.new(v)
          end
        end
        if config['event_hub_connection'] #add the 's' to pass validation
          config['event_hub_connections'] = config['event_hub_connection']
          config.delete('event_hub_connection')
        end

        config.merge!({'event_hubs' => [event_hub_name]})
        config.merge!(global_config) {|k, v1, v2| v1}
        @event_hubs_exploded << config
      end
    end
  else # basic config
    params['event_hubs'] = ['dummy'] # trick the :required validation
    if params['event_hub_connections']
      connections = *params['event_hub_connections'] # ensure array
      connections.each.with_index do |_connection, i|
        begin
          connection = self.class.replace_placeholders(_connection) if self.class.respond_to? 'replace_placeholders' # 6.x
          connection = self.class.replace_env_placeholders(_connection) if self.class.respond_to? 'replace_env_placeholders' # 5.x
          event_hub_name = ConnectionStringBuilder.new(connection).getEventHubName
          redacted_connection = connection.gsub(/(SharedAccessKey=)([0-9a-zA-Z=+]*)([;]*)(.*)/, '\\1<redacted>\\3\\4')
          params['event_hub_connections'][i] = redacted_connection # protect from leaking logs
          raise "invalid Event Hub name" unless event_hub_name
        rescue
          raise LogStash::ConfigurationError, "Error parsing event hub string name for connection: '#{redacted_connection}' please ensure that the connection string contains the EntityPath"
        end
        @event_hubs_exploded << {'event_hubs' => [event_hub_name]}.merge({'event_hub_connections' => [::LogStash::Util::Password.new(connection)]}).merge(global_config) {|k, v1, v2| v1}
      end
    end
  end

  super(params)

  container_consumer_groups = []
  # explicitly validate all the per event hub configs
  @event_hubs_exploded.each do |event_hub|
    if !self.class.validate(event_hub)
      raise LogStash::ConfigurationError, I18n.t("logstash.runner.configuration.invalid_plugin_settings")
    end
    container_consumer_groups << {event_hub['storage_connection'].value.to_s + (event_hub['storage_container'] ? event_hub['storage_container'] : event_hub['event_hubs'][0]) => event_hub['consumer_group']} if event_hub['storage_connection']
  end
  raise "The configuration will result in overwriting offsets. Please ensure that the each Event Hub's consumer_group is using a unique storage container." if container_consumer_groups.size > container_consumer_groups.uniq.size
end

Instance Attribute Details

#countObject (readonly)

Returns the value of attribute count.



294
295
296
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 294

def count
  @count
end

#event_hubs_explodedObject (readonly)

Returns the value of attribute event_hubs_exploded.



362
363
364
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 362

def event_hubs_exploded
  @event_hubs_exploded
end

Instance Method Details

#registerObject



364
365
366
367
368
369
370
371
372
373
374
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 364

def register
  # augment the exploded config with the defaults
  @event_hubs_exploded.each do |event_hub|
    @config.each do |key, value|
      if !key.eql?('id') && !key.eql?('event_hubs')
        event_hub[key] = value unless event_hub[key]
      end
    end
  end
  @logger.debug("Exploded Event Hub configuration.",  :event_hubs_exploded => @event_hubs_exploded.to_s)
end

#run(queue) ⇒ Object



376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# File 'lib/logstash/inputs/azure_event_hubs.rb', line 376

def run(queue)
  event_hub_threads = []
  named_thread_factory = LogStash::Inputs::Azure::NamedThreadFactory.new("azure_event_hubs-worker", @id)
  scheduled_executor_service = Executors.newScheduledThreadPool(@threads, named_thread_factory)
  @event_hubs_exploded.each do |event_hub|
    event_hub_threads << Thread.new do
      event_hub_name = event_hub['event_hubs'].first # there will always only be 1 from @event_hubs_exploded
      @logger.info("Event Hub #{event_hub_name} is initializing... ")
      begin
        if event_hub['storage_connection']
          event_processor_host = EventProcessorHost.new(
              EventProcessorHost.createHostName('logstash'),
              event_hub_name,
              event_hub['consumer_group'],
              event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
              event_hub['storage_connection'].value,
              event_hub.fetch('storage_container', event_hub_name),
              scheduled_executor_service)
        else
          @logger.warn("You have NOT specified a `storage_connection_string` for #{event_hub_name}. This configuration is only supported for a single Logstash instance.")
          checkpoint_manager = InMemoryCheckpointManager.new
          lease_manager = InMemoryLeaseManager.new
          event_processor_host = EventProcessorHost.new(
              EventProcessorHost.createHostName('logstash'),
              event_hub_name,
              event_hub['consumer_group'],
              event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
              checkpoint_manager,
              lease_manager,
              scheduled_executor_service,
              nil)
          #using java_send to avoid naming conflicts with 'initialize' method
          lease_manager.java_send :initialize, [HostContext], event_processor_host.getHostContext
          checkpoint_manager.java_send :initialize, [HostContext], event_processor_host.getHostContext
        end
        options = EventProcessorOptions.new
        options.setMaxBatchSize(max_batch_size)
        options.setPrefetchCount(prefetch_count)
        options.setReceiveTimeOut(Duration.ofSeconds(receive_timeout))
        
        options.setExceptionNotification(LogStash::Inputs::Azure::ErrorNotificationHandler.new)
        case @initial_position
        when 'beginning'
          msg = "Configuring Event Hub #{event_hub_name} to read events all events."
          @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
          @logger.info(msg) unless event_hub['storage_connection']
          options.setInitialPositionProvider(EventProcessorOptions::StartOfStreamInitialPositionProvider.new(options))
        when 'end'
          msg = "Configuring Event Hub #{event_hub_name} to read only new events."
          @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
          @logger.info(msg) unless event_hub['storage_connection']
          options.setInitialPositionProvider(EventProcessorOptions::EndOfStreamInitialPositionProvider.new(options))
        when 'look_back'
          msg = "Configuring Event Hub #{event_hub_name} to read events starting at 'now - #{@initial_position_look_back}' seconds."
          @logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
          @logger.info(msg) unless event_hub['storage_connection']
          options.setInitialPositionProvider(LogStash::Inputs::Azure::LookBackPositionProvider.new(@initial_position_look_back))
        end
        event_processor_host.registerEventProcessorFactory(LogStash::Inputs::Azure::ProcessorFactory.new(queue, event_hub['codec'], event_hub['checkpoint_interval'], self.method(:decorate), event_hub['decorate_events']), options)
            .when_complete(lambda {|x, e|
              @logger.info("Event Hub registration complete. ", :event_hub_name => event_hub_name )
              @logger.error("Event Hub failure while registering.", :event_hub_name => event_hub_name, :exception => e, :backtrace => e.backtrace) if e
            })
            .then_accept(lambda {|x|
              @logger.info("Event Hub is processing events... ", :event_hub_name => event_hub_name )
              # this blocks the completable future chain from progressing, actual work is done via the executor service
              while !stop?
                Stud.stoppable_sleep(1) {stop?}
              end
            })
            .then_compose(lambda {|x|
              @logger.info("Unregistering Event Hub this can take a while... ", :event_hub_name => event_hub_name )
              event_processor_host.unregisterEventProcessor
            })
            .exceptionally(lambda {|e|
              @logger.error("Event Hub encountered an error.", :event_hub_name => event_hub_name , :exception => e, :backtrace => e.backtrace) if e
              nil
            })
            .get # this blocks till all of the futures are complete.
        @logger.info("Event Hub #{event_hub_name} is closed.")
      rescue => e
        @logger.error("Event Hub failed during initialization.", :event_hub_name => event_hub_name, :exception => e, :backtrace => e.backtrace) if e
        do_stop
      end
    end
  end

  # this blocks the input from existing. (all work is being done in threads)
  while !stop?
    Stud.stoppable_sleep(1) {stop?}
  end

  # This blocks the input till all the threads have run to completion.
  event_hub_threads.each do |thread|
    thread.join
  end

  # Ensure proper shutdown of executor service. # Note - this causes a harmless warning in the logs that scheduled tasks are being rejected.
  scheduled_executor_service.shutdown
  begin
    scheduled_executor_service.awaitTermination(10, TimeUnit::MINUTES);
  rescue => e
    @logger.debug("interrupted while waiting to close executor service, this can generally be ignored", :exception => e, :backtrace => e.backtrace) if e
  end
end