Class: SoarAuditingProvider::AuditingWorker

Inherits:
SoarThreadWorker::ThreadWorker
  • Object
show all
Includes:
Singleton
Defined in:
lib/soar_auditing_provider/auditing_worker.rb

Instance Method Summary collapse

Constructor Details

#initializeAuditingWorker

Returns a new instance of AuditingWorker.



8
9
10
11
12
13
# File 'lib/soar_auditing_provider/auditing_worker.rb', line 8

def initialize
  @queue = Queue.new
  @queue_mutex = Mutex.new
  initialize_metrics
  super
end

Instance Method Details

#configure(queue_worker_configuration:, auditor_audit_method:) ⇒ Object



15
16
17
18
19
20
21
22
# File 'lib/soar_auditing_provider/auditing_worker.rb', line 15

def configure(queue_worker_configuration: ,auditor_audit_method: )
  validate_configuration(queue_worker_configuration)
  @maximum_queue_size = queue_worker_configuration['queue_size'].to_i
  @initial_back_off_in_seconds = queue_worker_configuration['initial_back_off_in_seconds'].to_i
  @back_off_multiplier = queue_worker_configuration['back_off_multiplier'].to_i
  @maximum_back_off_attempts = queue_worker_configuration['back_off_attempts'].to_i
  @auditor_audit_method = auditor_audit_method
end

#enqueue(level, data) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/soar_auditing_provider/auditing_worker.rb', line 24

def enqueue(level, data)
  @queue_mutex.synchronize {
    raise AuditingOverflowError if @queue.size >= @maximum_queue_size
    @queue.push({:level => level, :data => data})
    start
  }
end

#executeObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/soar_auditing_provider/auditing_worker.rb', line 32

def execute
  audit_event = nil
  @queue_mutex.synchronize {
    @thread.exit if @queue.empty?
    audit_event = @queue.pop
  }
  @dequeued_audits += 1
  failed_before = false
  begin
    if @stopping
      @queue_mutex.synchronize {
        @queue.push(audit_event) if audit_event #push the event back into the queue so that fallback flush mechanism can deal with this audit event
        @thread.exit
      }
    end
    exponential_back_off(start_at_last_attempt: failed_before) {
      time_before_audit = Time.now
      @auditor_audit_method.call(audit_event[:level],audit_event[:data])
      @latest_successful_audit_timespan = (Time.now - time_before_audit).round(3)
      @latest_successful_audit_timestamp = Time.now.utc.iso8601(3)
      @successful_audits += 1
    }
  rescue Exception => e
    print_exception_with_message_to_stderr(nil,e)
    failed_before = true
    retry
  end

  @queue_mutex.synchronize {
    return false if not @queue.empty? #indicates to thread worker that we are not done executing since the queue is not empty
    @thread.exit
  }
end

#flush(timeout:) ⇒ Object



66
67
68
69
70
71
72
73
# File 'lib/soar_auditing_provider/auditing_worker.rb', line 66

def flush(timeout:)
  start #start the worker thread just in case there are items enqueued
  sleep(0.1)
  stop(immediate: false)
  wait_for_worker_to_clear_queue(timeout)
  stop(immediate: true)
  fallback_flush_to_stderr (timeout) if not @queue.empty?
end

#status_detailObject



75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/soar_auditing_provider/auditing_worker.rb', line 75

def status_detail
  {
    'queue_size'                           => @queue.size,
    'dequeued_audits'                      => @dequeued_audits,
    'successful_audits'                    => @successful_audits,
    'failed_audit_attempts'                => @failed_audit_attempts,
    'latest_successful_audit_timespan'     => @latest_successful_audit_timespan,
    'latest_successful_audit_timestamp'    => @latest_successful_audit_timestamp,
    'latest_failed_audit_timestamp'        => @latest_failed_audit_timestamp,
    'latest_failed_audit_error_message'    => @latest_failed_audit_error_message
  }
end