Class: OneApm::Collector::CustomEventAggregator

Inherits:
Object
  • Object
show all
Includes:
OneApm::Coerce
Defined in:
lib/one_apm/collector/containers/custom_event_aggregator.rb

Constant Summary collapse

OA_TYPE =
'type'.freeze
OA_TIMESTAMP =
'timestamp'.freeze
OA_EVENT_PARAMS_CTX =
'recording custom event'.freeze
OA_EVENT_TYPE_REGEX =
/^[a-zA-Z0-9:_ ]+$/.freeze
OA_DEFAULT_CAPACITY_KEY =
:'custom_insights_events.max_samples_stored'

Instance Method Summary collapse

Methods included from OneApm::Coerce

#event_params, #float, #int, #int_or_nil, #log_failure, #string

Constructor Details

#initializeCustomEventAggregator

Returns a new instance of CustomEventAggregator.



17
18
19
20
21
22
# File 'lib/one_apm/collector/containers/custom_event_aggregator.rb', line 17

def initialize
  @lock         = Mutex.new
  @buffer       = OneApm::Agent::SampledBuffer.new(OneApm::Manager.config[OA_DEFAULT_CAPACITY_KEY])
  @type_strings = Hash.new { |hash, key| hash[key] = key.to_s.freeze }
  register_config_callbacks
end

Instance Method Details

#harvest!Object



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/one_apm/collector/containers/custom_event_aggregator.rb', line 51

def harvest!
  results = []
  drop_count = 0
  @lock.synchronize do
    results.concat(@buffer.to_a)
    drop_count += @buffer.num_dropped
    @buffer.reset!
  end
  note_dropped_events(results.size, drop_count)
  results
end

#merge!(events) ⇒ Object



74
75
76
77
78
79
80
# File 'lib/one_apm/collector/containers/custom_event_aggregator.rb', line 74

def merge!(events)
  @lock.synchronize do
    events.each do |event|
      @buffer.append(event)
    end
  end
end

#note_dropped_event(type) ⇒ Object



86
87
88
89
90
# File 'lib/one_apm/collector/containers/custom_event_aggregator.rb', line 86

def note_dropped_event(type)
  OneApm::Manager.logger.log_once(:warn, "dropping_event_of_type:#{type}",
    "Invalid event type name '#{type}', not recording.")
  @buffer.note_dropped
end

#note_dropped_events(captured_count, dropped_count) ⇒ Object



63
64
65
66
67
68
69
70
71
72
# File 'lib/one_apm/collector/containers/custom_event_aggregator.rb', line 63

def note_dropped_events(captured_count, dropped_count)
  total_count = captured_count + dropped_count
  if dropped_count > 0
    OneApm::Manager.logger.warn("Dropped #{dropped_count} custom events out of #{total_count}.")
  end
  engine = OneApm::Manager.agent.stats_engine
  engine.tl_record_supportability_metric_count("Events/Customer/Seen"   ,    total_count)
  engine.tl_record_supportability_metric_count("Events/Customer/Sent"   , captured_count)
  engine.tl_record_supportability_metric_count("Events/Customer/Dropped",  dropped_count)
end

#record(type, attributes) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/one_apm/collector/containers/custom_event_aggregator.rb', line 33

def record(type, attributes)
  type = @type_strings[type]
  unless type =~ OA_EVENT_TYPE_REGEX
    note_dropped_event(type)
    return false
  end

  event = [
    { OA_TYPE => type, OA_TIMESTAMP => Time.now.to_i },
    event_params(attributes, OA_EVENT_PARAMS_CTX)
  ]

  stored = @lock.synchronize do
    @buffer.append(event)
  end
  stored
end

#register_config_callbacksObject



24
25
26
27
28
29
30
31
# File 'lib/one_apm/collector/containers/custom_event_aggregator.rb', line 24

def register_config_callbacks
  OneApm::Manager.config.register_callback(OA_DEFAULT_CAPACITY_KEY) do |max_samples|
    OneApm::Manager.logger.debug "CustomEventAggregator max_samples set to #{max_samples}"
    @lock.synchronize do
      @buffer.capacity = max_samples
    end
  end
end

#reset!Object



82
83
84
# File 'lib/one_apm/collector/containers/custom_event_aggregator.rb', line 82

def reset!
  @lock.synchronize { @buffer.reset! }
end