Class: NewRelic::Agent::InfiniteTracing::Client

Inherits:
Object
  • Object
show all
Includes:
Constants
Defined in:
lib/infinite_tracing/client.rb

Constant Summary

Constants included from Constants

NewRelic::Agent::InfiniteTracing::Constants::GRPC_ERROR_NAME_METRIC, NewRelic::Agent::InfiniteTracing::Constants::GRPC_OTHER_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::QUEUE_DUMPED_METRIC, NewRelic::Agent::InfiniteTracing::Constants::RESPONSE_ERROR_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SEEN_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SPANS_SENT_METRIC, NewRelic::Agent::InfiniteTracing::Constants::SUPPORTABILITY_PREFIX

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



18
19
20
21
22
# File 'lib/infinite_tracing/client.rb', line 18

def initialize
  @suspended = false
  @response_handler = nil
  @lock = Mutex.new
end

Instance Method Details

#<<(segment) ⇒ Object



24
25
26
# File 'lib/infinite_tracing/client.rb', line 24

def <<(segment)
  buffer << segment
end

#batching_enabled?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/infinite_tracing/client.rb', line 28

def batching_enabled?
  NewRelic::Agent.config[:'infinite_tracing.batching']
end

#bufferObject



48
49
50
# File 'lib/infinite_tracing/client.rb', line 48

def buffer
  @buffer ||= new_streaming_buffer
end

#flushObject



52
53
54
# File 'lib/infinite_tracing/client.rb', line 52

def flush
  buffer.flush_queue
end

#formatted_class_name(class_name) ⇒ Object

Turns camelcase base class name into upper snake case version of the name.



57
58
59
60
# File 'lib/infinite_tracing/client.rb', line 57

def formatted_class_name(class_name)
  class_name = class_name.split(':')[-1]
  (class_name.gsub!(/(.)([A-Z])/, '\1_\2') || class_name).upcase
end

#grpc_error_metric_name(error) ⇒ Object

Literal codes are all mapped to unique class names, so we can deduce the name of the error to report in the metric from the error’s class name.



64
65
66
# File 'lib/infinite_tracing/client.rb', line 64

def grpc_error_metric_name(error)
  GRPC_ERROR_NAME_METRIC % formatted_class_name(error.class.name)
end

#handle_closeObject

This method is called when the server closes the record status stream without raising an error. The Channel/Connection is not closed or reset in this case. We simply start streaming again, which will reuse the channel/connection to the server and re-establish the gRPC bi-directional stream. Useful for the server to initiate a load-balancing scheme.



101
102
103
104
105
# File 'lib/infinite_tracing/client.rb', line 101

def handle_close
  NewRelic::Agent.logger.debug('The gRPC Trace Observer closed the stream with OK response. ' \
    'Restarting the stream.')
  start_streaming
end

#handle_error(error) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/infinite_tracing/client.rb', line 79

def handle_error(error)
  record_error_metrics_and_log(error)

  case error
  when GRPC::Unavailable then restart
  when GRPC::FailedPrecondition then restart
  when GRPC::Unimplemented then suspend
  else
    # Set exponential backoff to false so we'll reconnect at periodic (15 second) intervals instead
    start_streaming(false)
  end
end

#new_streaming_bufferObject

provides the correct streaming buffer instance based on whether the client is currently suspended.



43
44
45
46
# File 'lib/infinite_tracing/client.rb', line 43

def new_streaming_buffer
  buffer_class = suspended? ? SuspendedStreamingBuffer : StreamingBuffer
  buffer_class.new(Config.span_events_queue_size)
end

#record_error_metrics_and_log(error) ⇒ Object

Reports AND logs general response metric along with a more specific error metric



69
70
71
72
73
74
75
76
77
# File 'lib/infinite_tracing/client.rb', line 69

def record_error_metrics_and_log(error)
  NewRelic::Agent.record_metric(RESPONSE_ERROR_METRIC, 0.0)
  if error.is_a?(GRPC::BadStatus)
    NewRelic::Agent.record_metric(grpc_error_metric_name(error), 0.0)
  else
    NewRelic::Agent.record_metric(GRPC_OTHER_ERROR_METRIC, 0.0)
  end
  NewRelic::Agent.logger.warn('gRPC response error received.', error)
end

#record_span_batches(exponential_backoff) ⇒ Object



165
166
167
# File 'lib/infinite_tracing/client.rb', line 165

def record_span_batches(exponential_backoff)
  RecordStatusHandler.new(self, Connection.record_span_batches(self, buffer.batch_enumerator, exponential_backoff))
end

#record_spans(exponential_backoff) ⇒ Object



161
162
163
# File 'lib/infinite_tracing/client.rb', line 161

def record_spans(exponential_backoff)
  RecordStatusHandler.new(self, Connection.record_spans(self, buffer.enumerator, exponential_backoff))
end

#reset_connectionObject



121
122
123
124
125
# File 'lib/infinite_tracing/client.rb', line 121

def reset_connection
  @lock.synchronize do
    Connection.reset
  end
end

#response_handler(backoff) ⇒ Object



169
170
171
# File 'lib/infinite_tracing/client.rb', line 169

def response_handler(backoff)
  @response_handler = batching_enabled? ? record_span_batches(backoff) : record_spans(backoff)
end

#restartObject



135
136
137
138
139
140
141
# File 'lib/infinite_tracing/client.rb', line 135

def restart
  NewRelic::Agent.disable_all_tracing do
    reset_connection
    transfer_buffer
    start_streaming
  end
end

#start_streaming(exponential_backoff = true) ⇒ Object



152
153
154
155
156
157
158
159
# File 'lib/infinite_tracing/client.rb', line 152

def start_streaming(exponential_backoff = true)
  return if suspended?

  NewRelic::Agent.disable_all_tracing do
    Connection.instance.wait_for_agent_connect
    @lock.synchronize { response_handler(exponential_backoff) }
  end
end

#stopObject



143
144
145
146
147
148
149
150
# File 'lib/infinite_tracing/client.rb', line 143

def stop
  return unless @response_handler

  @lock.synchronize do
    @response_handler.stop
    @response_handler = nil
  end
end

#suspendObject

Places the client into suspended state whereby client will no longer attempt to reconnect to the gRPC server nor will it attempt to send span events henceforth. The Suspended Streaming Buffer will be installed in this state.



110
111
112
113
114
115
116
117
118
119
# File 'lib/infinite_tracing/client.rb', line 110

def suspend
  return if suspended?

  @lock.synchronize do
    @suspended = true
    @buffer = new_streaming_buffer
    NewRelic::Agent.logger.warn('The Trace Observer host signaled to suspend streaming span events. ' \
      'No more span events will be sent during this session.')
  end
end

#suspended?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/infinite_tracing/client.rb', line 92

def suspended?
  @suspended
end

#transfer(previous_client) ⇒ Object

Transfers spans in streaming buffer from previous client (if any) and returns self (so we chain the call)



34
35
36
37
38
39
# File 'lib/infinite_tracing/client.rb', line 34

def transfer(previous_client)
  return self unless previous_client

  previous_client.buffer.transfer(buffer)
  self
end

#transfer_bufferObject



127
128
129
130
131
132
133
# File 'lib/infinite_tracing/client.rb', line 127

def transfer_buffer
  @lock.synchronize do
    old_buffer = @buffer
    @buffer = new_streaming_buffer
    old_buffer.transfer(@buffer)
  end
end