Class: OneApm::Collector::TransactionEventAggregator
- Inherits:
-
Object
- Object
- OneApm::Collector::TransactionEventAggregator
show all
- Includes:
- MonitorMixin, OneApm::Coerce
- Defined in:
- lib/one_apm/collector/containers/transaction_event_aggregator.rb
Constant Summary
collapse
- OA_SAMPLE_TYPE =
The type field of the sample
'Transaction'.freeze
- OA_TYPE_KEY =
Strings for static keys of the sample structure
'type'.freeze
- OA_TIMESTAMP_KEY =
'timestamp'.freeze
- OA_NAME_KEY =
'name'.freeze
- OA_DURATION_KEY =
'duration'.freeze
- OA_HTTP_RESPONSE_CODE_KEY =
'httpResponseCode'.freeze
- OA_GUID_KEY =
'bw.guid'.freeze
- OA_REFERRING_TRANSACTION_GUID_KEY =
'bw.referringTransactionGuid'.freeze
- OA_CAT_TRIP_ID_KEY =
'bw.tripId'.freeze
- OA_CAT_PATH_HASH_KEY =
'bw.pathHash'.freeze
- OA_CAT_REFERRING_PATH_HASH_KEY =
'bw.referringPathHash'.freeze
- OA_CAT_ALTERNATE_PATH_HASHES_KEY =
'bw.alternatePathHashes'.freeze
- OA_APDEX_PERF_ZONE_KEY =
'bw.apdexPerfZone'.freeze
- OA_SYNTHETICS_RESOURCE_ID_KEY =
"bw.syntheticsResourceId".freeze
- OA_SYNTHETICS_JOB_ID_KEY =
"bw.syntheticsJobId".freeze
- OA_SYNTHETICS_MONITOR_ID_KEY =
"bw.syntheticsMonitorId".freeze
Instance Method Summary
collapse
-
#append_cat_alternate_path_hashes(sample, payload) ⇒ Object
-
#append_event(event) ⇒ Object
-
#append_http_response_code(sample, payload) ⇒ Object
-
#create_custom_parameters(payload) ⇒ Object
-
#create_main_event(payload) ⇒ Object
-
#create_sub_events(main_event, payload) ⇒ Object
-
#error_times(txn_metrics) ⇒ Object
-
#harvest! ⇒ Object
Clear any existing samples, reset the last sample time, and return the previous set of samples.
-
#initialize(event_listener) ⇒ TransactionEventAggregator
constructor
A new instance of TransactionEventAggregator.
-
#merge!(old_samples) ⇒ Object
Merge samples back into the buffer, for example after a failed transmission to the collector.
-
#notify_full ⇒ Object
-
#on_cross_app_transaction_finished(payload) ⇒ Object
Event handler for the :transaction_finished event.
-
#on_transaction_finished(payload) ⇒ Object
Event handler for the :transaction_finished event.
-
#optionally_append(sample_key, payload_key, sample, payload) ⇒ Object
-
#record_dropped_synthetics(synthetics_dropped) ⇒ Object
-
#record_sampling_rate(request_count, sample_count) ⇒ Object
-
#register_config_callbacks ⇒ Object
-
#reset! ⇒ Object
-
#samples ⇒ Object
Fetch a copy of the sampler’s gathered samples.
#event_params, #float, #int, #int_or_nil, #log_failure, #string
Constructor Details
Returns a new instance of TransactionEventAggregator.
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 30
def initialize(event_listener)
super()
@enabled = false
@notified_full = false
@samples = ::OneApm::Agent::SampledBuffer.new(OneApm::Manager.config[:'analytics_events.max_samples_stored'])
@synthetics_samples = ::OneApm::Agent::SyntheticsEventBuffer.new(OneApm::Manager.config[:'synthetics.events_limit'])
event_listener.subscribe(:transaction_finished, &method(:on_transaction_finished))
self.register_config_callbacks
end
|
Instance Method Details
#append_cat_alternate_path_hashes(sample, payload) ⇒ Object
206
207
208
209
210
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 206
def append_cat_alternate_path_hashes(sample, payload)
if payload.include?(:cat_alternate_path_hashes)
sample[OA_CAT_ALTERNATE_PATH_HASHES_KEY] = payload[:cat_alternate_path_hashes].sort.join(',')
end
end
|
#append_event(event) ⇒ Object
164
165
166
167
168
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 164
def append_event(event)
same_spec_for_sample = @samples.select{|sample| sample.spec_name == event.spec_name}.first
return @samples.append(event) if same_spec_for_sample.nil?
same_spec_for_sample.event_analytic_data.concat event.event_analytic_data
end
|
#append_http_response_code(sample, payload) ⇒ Object
200
201
202
203
204
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 200
def append_http_response_code(sample, payload)
unless OneApm::Manager.config[:disable_rack_middleware]
optionally_append(OA_HTTP_RESPONSE_CODE_KEY, :http_response_code, sample, payload)
end
end
|
#create_custom_parameters(payload) ⇒ Object
218
219
220
221
222
223
224
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 218
def create_custom_parameters(payload)
custom_params = {}
if OneApm::Manager.config[:'analytics_events.capture_attributes']
custom_params.merge!(event_params(payload[:custom_params] || {}))
end
custom_params
end
|
#create_main_event(payload) ⇒ Object
171
172
173
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 171
def create_main_event(payload)
OneApm::EventAnalyticSample.new(payload)
end
|
#create_sub_events(main_event, payload) ⇒ Object
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 175
def create_sub_events main_event, payload
payload = payload.dup
payload[:scope] = payload[:name]
payload[:referring_transaction_guid] = main_event.guid
payload[:request_url] = ''
sub_event_samples = []
return sub_event_samples unless payload[:metrics]
payload[:metrics].each_scoped do |metric_name, status|
next if metric_name =~ /^Nested|View|External/
payload[:name] = metric_name
payload[:guid] = OneApm::Helper.generate_guid
payload[:call_count] = status.call_count
payload[:duration] = status.total_call_time
sample = OneApm::EventAnalyticSample.new(payload)
sub_event_samples << sample
end
sub_event_samples
end
|
#error_times(txn_metrics) ⇒ Object
195
196
197
198
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 195
def error_times(txn_metrics)
return txn_metrics[OA_ERROR_ALL_KEY].call_count if txn_metrics.has_key?(OA_ERROR_ALL_KEY) rescue 0
return 0
end
|
#harvest! ⇒ Object
Clear any existing samples, reset the last sample time, and return the previous set of samples. (Synchronized)
70
71
72
73
74
75
76
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 70
def harvest!
old_samples, sample_count, request_count, synthetics_dropped = reset!
record_sampling_rate(request_count, sample_count) if @enabled
record_dropped_synthetics(synthetics_dropped)
old_samples
end
|
#merge!(old_samples) ⇒ Object
Merge samples back into the buffer, for example after a failed transmission to the collector. (Synchronized)
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 80
def merge!(old_samples)
self.synchronize do
old_samples.each do |s|
if s.respond_to?(:spec_name)
append_event(s)
else
@samples.append_event(s)
end
end
end
end
|
#notify_full ⇒ Object
134
135
136
137
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 134
def notify_full
OneApm::Manager.logger.debug "Transaction event capacity of #{@samples.capacity} reached, beginning sampling"
@notified_full = true
end
|
#on_cross_app_transaction_finished(payload) ⇒ Object
Event handler for the :transaction_finished event.
155
156
157
158
159
160
161
162
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 155
def on_cross_app_transaction_finished(payload)
return unless @enabled
main_event = create_main_event(payload)
self.synchronize do
append_event(main_event)
end
notify_full if !@notified_full && @samples.full?
end
|
#on_transaction_finished(payload) ⇒ Object
Event handler for the :transaction_finished event.
140
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 140
def on_transaction_finished(payload)
return unless @enabled
main_event = create_main_event(payload)
custom_params = create_custom_parameters(payload)
sub_events = create_sub_events(main_event, payload)
self.synchronize do
[main_event].concat(sub_events).each do |event|
append_event(event)
end
end
notify_full if !@notified_full && @samples.full?
end
|
#optionally_append(sample_key, payload_key, sample, payload) ⇒ Object
212
213
214
215
216
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 212
def optionally_append(sample_key, payload_key, sample, payload)
if payload.include?(payload_key)
sample[sample_key] = string(payload[payload_key])
end
end
|
#record_dropped_synthetics(synthetics_dropped) ⇒ Object
109
110
111
112
113
114
115
116
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 109
def record_dropped_synthetics(synthetics_dropped)
return unless synthetics_dropped > 0
OneApm::Manager.logger.debug("Synthetics transaction event limit (#{@samples.capacity}) reached. Further synthetics events this harvest period dropped.")
engine = OneApm::Manager.agent.stats_engine
engine.tl_record_supportability_metric_count("TransactionEventAggregator/synthetics_events_dropped", synthetics_dropped)
end
|
#record_sampling_rate(request_count, sample_count) ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 92
def record_sampling_rate(request_count, sample_count)
request_count_lifetime = @samples.seen_lifetime
sample_count_lifetime = @samples.captured_lifetime
OneApm::Manager.logger.debug("Sampled %d / %d (%.1f %%) requests this cycle, %d / %d (%.1f %%) since startup" % [
sample_count,
request_count,
(sample_count.to_f / request_count * 100.0),
sample_count_lifetime,
request_count_lifetime,
(sample_count_lifetime.to_f / request_count_lifetime * 100.0)
])
engine = OneApm::Manager.agent.stats_engine
engine.tl_record_supportability_metric_count("TransactionEventAggregator/requests", request_count)
engine.tl_record_supportability_metric_count("TransactionEventAggregator/samples", sample_count)
end
|
#register_config_callbacks ⇒ Object
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 118
def register_config_callbacks
OneApm::Manager.config.register_callback(:'analytics_events.max_samples_stored') do |max_samples|
OneApm::Manager.logger.debug "TransactionEventAggregator max_samples set to #{max_samples}"
self.synchronize { @samples.capacity = max_samples }
end
OneApm::Manager.config.register_callback(:'synthetics.events_limit') do |max_samples|
OneApm::Manager.logger.debug "TransactionEventAggregator limit for synthetics events set to #{max_samples}"
self.synchronize { @synthetics_samples.capacity = max_samples }
end
OneApm::Manager.config.register_callback(:'analytics_events.enabled') do |enabled|
@enabled = enabled
end
end
|
#reset! ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 48
def reset!
sample_count, request_count, synthetics_dropped = 0
old_samples = nil
self.synchronize do
sample_count = @samples.size
request_count = @samples.num_seen
synthetics_dropped = @synthetics_samples.num_dropped
old_samples = @samples.to_a + @synthetics_samples.to_a
@samples.reset!
@synthetics_samples.reset!
@notified_full = false
end
[old_samples, sample_count, request_count, synthetics_dropped]
end
|
#samples ⇒ Object
Fetch a copy of the sampler’s gathered samples. (Synchronized)
44
45
46
|
# File 'lib/one_apm/collector/containers/transaction_event_aggregator.rb', line 44
def samples
self.synchronize { @samples.to_a.concat(@synthetics_samples.to_a) }
end
|