Class: OneApm::Collector::TransactionEventAggregator

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, OneApm::Coerce
Defined in:
lib/one_apm/collector/containers/transaction_event_aggregator.rb

Constant Summary collapse

OA_SAMPLE_TYPE =

The type field of the sample

'Transaction'.freeze
OA_TYPE_KEY =

Strings for static keys of the sample structure

'type'.freeze
OA_TIMESTAMP_KEY =
'timestamp'.freeze
OA_NAME_KEY =
'name'.freeze
OA_DURATION_KEY =
'duration'.freeze
OA_HTTP_RESPONSE_CODE_KEY =
'httpResponseCode'.freeze
OA_GUID_KEY =
'bw.guid'.freeze
OA_REFERRING_TRANSACTION_GUID_KEY =
'bw.referringTransactionGuid'.freeze
OA_CAT_TRIP_ID_KEY =
'bw.tripId'.freeze
OA_CAT_PATH_HASH_KEY =
'bw.pathHash'.freeze
OA_CAT_REFERRING_PATH_HASH_KEY =
'bw.referringPathHash'.freeze
OA_CAT_ALTERNATE_PATH_HASHES_KEY =
'bw.alternatePathHashes'.freeze
OA_APDEX_PERF_ZONE_KEY =
'bw.apdexPerfZone'.freeze
OA_SYNTHETICS_RESOURCE_ID_KEY =
"bw.syntheticsResourceId".freeze
OA_SYNTHETICS_JOB_ID_KEY =
"bw.syntheticsJobId".freeze
OA_SYNTHETICS_MONITOR_ID_KEY =
"bw.syntheticsMonitorId".freeze

Instance Method Summary collapse

Methods included from OneApm::Coerce

#event_params, #float, #int, #int_or_nil, #log_failure, #string

Constructor Details

#initialize(event_listener) ⇒ TransactionEventAggregator

Returns a new instance of TransactionEventAggregator.



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 30

def initialize(event_listener)
  super()

  @enabled       = false
  @notified_full = false

  @samples            = ::OneApm::Agent::SampledBuffer.new(OneApm::Manager.config[:'analytics_events.max_samples_stored'])
  @synthetics_samples = ::OneApm::Agent::SyntheticsEventBuffer.new(OneApm::Manager.config[:'synthetics.events_limit'])

  event_listener.subscribe(:transaction_finished, &method(:on_transaction_finished))
  self.register_config_callbacks
end

Instance Method Details

#append_cat_alternate_path_hashes(sample, payload) ⇒ Object



206
207
208
209
210
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 206

def append_cat_alternate_path_hashes(sample, payload)
  if payload.include?(:cat_alternate_path_hashes)
    sample[OA_CAT_ALTERNATE_PATH_HASHES_KEY] = payload[:cat_alternate_path_hashes].sort.join(',')
  end
end

#append_event(event) ⇒ Object



164
165
166
167
168
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 164

def append_event(event)
  same_spec_for_sample = @samples.select{|sample| sample.spec_name == event.spec_name}.first
  return @samples.append(event) if same_spec_for_sample.nil?
  same_spec_for_sample.event_analytic_data.concat event.event_analytic_data
end

#append_http_response_code(sample, payload) ⇒ Object



200
201
202
203
204
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 200

def append_http_response_code(sample, payload)
  unless OneApm::Manager.config[:disable_rack_middleware]
    optionally_append(OA_HTTP_RESPONSE_CODE_KEY, :http_response_code, sample, payload)
  end
end

#create_custom_parameters(payload) ⇒ Object



218
219
220
221
222
223
224
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 218

def create_custom_parameters(payload)
  custom_params = {}
  if OneApm::Manager.config[:'analytics_events.capture_attributes']
    custom_params.merge!(event_params(payload[:custom_params] || {}))
  end
  custom_params
end

#create_main_event(payload) ⇒ Object



171
172
173
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 171

def create_main_event(payload)
  OneApm::EventAnalyticSample.new(payload)
end

#create_sub_events(main_event, payload) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 175

def create_sub_events main_event, payload
   payload = payload.dup
   payload[:scope] = payload[:name]
   payload[:referring_transaction_guid] = main_event.guid
   payload[:request_url] = ''

   sub_event_samples = []
   return sub_event_samples unless payload[:metrics]
   payload[:metrics].each_scoped do |metric_name, status|
    next if metric_name =~ /^Nested|View|External/
    payload[:name] = metric_name
    payload[:guid] = OneApm::Helper.generate_guid
    payload[:call_count] = status.call_count
    payload[:duration] = status.total_call_time
    sample = OneApm::EventAnalyticSample.new(payload)
    sub_event_samples << sample
  end
  sub_event_samples
end

#error_times(txn_metrics) ⇒ Object



195
196
197
198
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 195

def error_times(txn_metrics)
  return txn_metrics[OA_ERROR_ALL_KEY].call_count if txn_metrics.has_key?(OA_ERROR_ALL_KEY) rescue 0
  return 0
end

#harvest!Object

Clear any existing samples, reset the last sample time, and return the previous set of samples. (Synchronized)



70
71
72
73
74
75
76
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 70

def harvest!
  old_samples, sample_count, request_count, synthetics_dropped = reset!
  record_sampling_rate(request_count, sample_count) if @enabled
  record_dropped_synthetics(synthetics_dropped)
  # return old_samples.map(&:to_collector_array) if old_samples.respond_to?(:to_collector_array)
  old_samples
end

#merge!(old_samples) ⇒ Object

Merge samples back into the buffer, for example after a failed transmission to the collector. (Synchronized)



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 80

def merge!(old_samples)
  self.synchronize do
    old_samples.each do |s|
      if s.respond_to?(:spec_name)
        append_event(s)
      else
        @samples.append_event(s)
      end
    end
  end
end

#notify_fullObject



134
135
136
137
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 134

def notify_full
  OneApm::Manager.logger.debug "Transaction event capacity of #{@samples.capacity} reached, beginning sampling"
  @notified_full = true
end

#on_cross_app_transaction_finished(payload) ⇒ Object

Event handler for the :transaction_finished event.



155
156
157
158
159
160
161
162
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 155

def on_cross_app_transaction_finished(payload)
  return unless @enabled
  main_event = create_main_event(payload)
  self.synchronize do
    append_event(main_event)
  end
  notify_full if !@notified_full && @samples.full?
end

#on_transaction_finished(payload) ⇒ Object

Event handler for the :transaction_finished event.



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 140

def on_transaction_finished(payload)
  return unless @enabled
  main_event = create_main_event(payload)
  custom_params = create_custom_parameters(payload)
  sub_events = create_sub_events(main_event, payload)
  self.synchronize do 
     [main_event].concat(sub_events).each do |event|
        append_event(event) 
      end
   end
  notify_full if !@notified_full && @samples.full?
end

#optionally_append(sample_key, payload_key, sample, payload) ⇒ Object



212
213
214
215
216
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 212

def optionally_append(sample_key, payload_key, sample, payload)
  if payload.include?(payload_key)
    sample[sample_key] = string(payload[payload_key])
  end
end

#record_dropped_synthetics(synthetics_dropped) ⇒ Object



109
110
111
112
113
114
115
116
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 109

def record_dropped_synthetics(synthetics_dropped)
  return unless synthetics_dropped > 0

  OneApm::Manager.logger.debug("Synthetics transaction event limit (#{@samples.capacity}) reached. Further synthetics events this harvest period dropped.")

  engine = OneApm::Manager.agent.stats_engine
  engine.tl_record_supportability_metric_count("TransactionEventAggregator/synthetics_events_dropped", synthetics_dropped)
end

#record_sampling_rate(request_count, sample_count) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 92

def record_sampling_rate(request_count, sample_count)
  request_count_lifetime = @samples.seen_lifetime
  sample_count_lifetime = @samples.captured_lifetime
  OneApm::Manager.logger.debug("Sampled %d / %d (%.1f %%) requests this cycle, %d / %d (%.1f %%) since startup" % [
    sample_count,
    request_count,
    (sample_count.to_f / request_count * 100.0),
    sample_count_lifetime,
    request_count_lifetime,
    (sample_count_lifetime.to_f / request_count_lifetime * 100.0)
  ])

  engine = OneApm::Manager.agent.stats_engine
  engine.tl_record_supportability_metric_count("TransactionEventAggregator/requests", request_count)
  engine.tl_record_supportability_metric_count("TransactionEventAggregator/samples", sample_count)
end

#register_config_callbacksObject



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 118

def register_config_callbacks
  OneApm::Manager.config.register_callback(:'analytics_events.max_samples_stored') do |max_samples|
    OneApm::Manager.logger.debug "TransactionEventAggregator max_samples set to #{max_samples}"
    self.synchronize { @samples.capacity = max_samples }
  end

  OneApm::Manager.config.register_callback(:'synthetics.events_limit') do |max_samples|
    OneApm::Manager.logger.debug "TransactionEventAggregator limit for synthetics events set to #{max_samples}"
    self.synchronize { @synthetics_samples.capacity = max_samples }
  end

  OneApm::Manager.config.register_callback(:'analytics_events.enabled') do |enabled|
    @enabled = enabled
  end
end

#reset!Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 48

def reset!
  sample_count, request_count, synthetics_dropped = 0
  old_samples = nil

  self.synchronize do
    sample_count = @samples.size
    request_count = @samples.num_seen

    synthetics_dropped = @synthetics_samples.num_dropped

    old_samples = @samples.to_a + @synthetics_samples.to_a
    @samples.reset!
    @synthetics_samples.reset!

    @notified_full = false
  end

  [old_samples, sample_count, request_count, synthetics_dropped]
end

#samplesObject

Fetch a copy of the sampler’s gathered samples. (Synchronized)



44
45
46
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 44

def samples
  self.synchronize { @samples.to_a.concat(@synthetics_samples.to_a) }
end