Class: NewRelic::Agent::InfiniteTracing::Client
- Inherits:
-
Object
- Object
- NewRelic::Agent::InfiniteTracing::Client
show all
- Includes:
- Constants
- Defined in:
- lib/infinite_tracing/client.rb
Constant Summary
Constants included
from Constants
NewRelic::Agent::InfiniteTracing::Constants::GRPC_ERROR_NAME_METRIC, NewRelic::Agent::InfiniteTracing::Constants::GRPC_OTHER_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::QUEUE_DUMPED_METRIC, NewRelic::Agent::InfiniteTracing::Constants::RESPONSE_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SEEN_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SENT_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SUPPORTABILITY_PREFIX
Instance Method Summary
collapse
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
18
19
20
21
22
|
# File 'lib/infinite_tracing/client.rb', line 18
def initialize
@suspended = false
@response_handler = nil
@lock = Mutex.new
end
|
Instance Method Details
#<<(segment) ⇒ Object
24
25
26
|
# File 'lib/infinite_tracing/client.rb', line 24
def <<(segment)
buffer << segment
end
|
#batching_enabled? ⇒ Boolean
28
29
30
|
# File 'lib/infinite_tracing/client.rb', line 28
def batching_enabled?
NewRelic::Agent.config[:'infinite_tracing.batching']
end
|
#buffer ⇒ Object
48
49
50
|
# File 'lib/infinite_tracing/client.rb', line 48
def buffer
@buffer ||= new_streaming_buffer
end
|
#flush ⇒ Object
52
53
54
|
# File 'lib/infinite_tracing/client.rb', line 52
def flush
buffer.flush_queue
end
|
Turns camelcase base class name into upper snake case version of the name.
57
58
59
60
|
# File 'lib/infinite_tracing/client.rb', line 57
def formatted_class_name(class_name)
class_name = class_name.split(':')[-1]
(class_name.gsub!(/(.)([A-Z])/, '\1_\2') || class_name).upcase
end
|
#grpc_error_metric_name(error) ⇒ Object
Literal codes are all mapped to unique class names, so we can deduce the name of the error to report in the metric from the error’s class name.
64
65
66
|
# File 'lib/infinite_tracing/client.rb', line 64
def grpc_error_metric_name(error)
GRPC_ERROR_NAME_METRIC % formatted_class_name(error.class.name)
end
|
#handle_close ⇒ Object
This method is called when the server closes the record status stream without raising an error. The Channel/Connection is not closed or reset in this case. We simply start streaming again, which will reuse the channel/connection to the server and re-establish the gRPC bi-directional stream. Useful for the server to initiate a load-balancing scheme.
101
102
103
104
105
|
# File 'lib/infinite_tracing/client.rb', line 101
def handle_close
NewRelic::Agent.logger.debug('The gRPC Trace Observer closed the stream with OK response. ' \
'Restarting the stream.')
start_streaming
end
|
#handle_error(error) ⇒ Object
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/infinite_tracing/client.rb', line 79
def handle_error(error)
record_error_metrics_and_log(error)
case error
when GRPC::Unavailable then restart
when GRPC::FailedPrecondition then restart
when GRPC::Unimplemented then suspend
else
start_streaming(false)
end
end
|
#new_streaming_buffer ⇒ Object
provides the correct streaming buffer instance based on whether the client is currently suspended.
43
44
45
46
|
# File 'lib/infinite_tracing/client.rb', line 43
def new_streaming_buffer
buffer_class = suspended? ? SuspendedStreamingBuffer : StreamingBuffer
buffer_class.new(Config.span_events_queue_size)
end
|
#record_error_metrics_and_log(error) ⇒ Object
Reports AND logs general response metric along with a more specific error metric
69
70
71
72
73
74
75
76
77
|
# File 'lib/infinite_tracing/client.rb', line 69
def record_error_metrics_and_log(error)
NewRelic::Agent.record_metric(RESPONSE_ERROR_METRIC, 0.0)
if error.is_a?(GRPC::BadStatus)
NewRelic::Agent.record_metric(grpc_error_metric_name(error), 0.0)
else
NewRelic::Agent.record_metric(GRPC_OTHER_ERROR_METRIC, 0.0)
end
NewRelic::Agent.logger.warn('gRPC response error received.', error)
end
|
#record_span_batches(exponential_backoff) ⇒ Object
165
166
167
|
# File 'lib/infinite_tracing/client.rb', line 165
def record_span_batches(exponential_backoff)
RecordStatusHandler.new(self, Connection.record_span_batches(self, buffer.batch_enumerator, exponential_backoff))
end
|
#record_spans(exponential_backoff) ⇒ Object
161
162
163
|
# File 'lib/infinite_tracing/client.rb', line 161
def record_spans(exponential_backoff)
RecordStatusHandler.new(self, Connection.record_spans(self, buffer.enumerator, exponential_backoff))
end
|
#reset_connection ⇒ Object
121
122
123
124
125
|
# File 'lib/infinite_tracing/client.rb', line 121
def reset_connection
@lock.synchronize do
Connection.reset
end
end
|
#response_handler(backoff) ⇒ Object
169
170
171
|
# File 'lib/infinite_tracing/client.rb', line 169
def response_handler(backoff)
@response_handler = batching_enabled? ? record_span_batches(backoff) : record_spans(backoff)
end
|
#restart ⇒ Object
135
136
137
138
139
140
141
|
# File 'lib/infinite_tracing/client.rb', line 135
def restart
NewRelic::Agent.disable_all_tracing do
reset_connection
transfer_buffer
start_streaming
end
end
|
#start_streaming(exponential_backoff = true) ⇒ Object
152
153
154
155
156
157
158
159
|
# File 'lib/infinite_tracing/client.rb', line 152
def start_streaming(exponential_backoff = true)
return if suspended?
NewRelic::Agent.disable_all_tracing do
Connection.instance.wait_for_agent_connect
@lock.synchronize { response_handler(exponential_backoff) }
end
end
|
#stop ⇒ Object
143
144
145
146
147
148
149
150
|
# File 'lib/infinite_tracing/client.rb', line 143
def stop
return unless @response_handler
@lock.synchronize do
@response_handler.stop
@response_handler = nil
end
end
|
#suspend ⇒ Object
Places the client into suspended state whereby client will no longer attempt to reconnect to the gRPC server nor will it attempt to send span events henceforth. The Suspended Streaming Buffer will be installed in this state.
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/infinite_tracing/client.rb', line 110
def suspend
return if suspended?
@lock.synchronize do
@suspended = true
@buffer = new_streaming_buffer
NewRelic::Agent.logger.warn('The Trace Observer host signaled to suspend streaming span events. ' \
'No more span events will be sent during this session.')
end
end
|
#suspended? ⇒ Boolean
92
93
94
|
# File 'lib/infinite_tracing/client.rb', line 92
def suspended?
@suspended
end
|
#transfer(previous_client) ⇒ Object
Transfers spans in streaming buffer from previous client (if any) and returns self (so we chain the call)
34
35
36
37
38
39
|
# File 'lib/infinite_tracing/client.rb', line 34
def transfer(previous_client)
return self unless previous_client
previous_client.buffer.transfer(buffer)
self
end
|
#transfer_buffer ⇒ Object
127
128
129
130
131
132
133
|
# File 'lib/infinite_tracing/client.rb', line 127
def transfer_buffer
@lock.synchronize do
old_buffer = @buffer
@buffer = new_streaming_buffer
old_buffer.transfer(@buffer)
end
end
|