Class: Instana::Processor
- Inherits:
-
Object
- Object
- Instana::Processor
- Extended by:
- Forwardable
- Defined in:
- lib/instana/trace/processor.rb
Instance Method Summary collapse
-
#clear! ⇒ Object
Removes all traces from the @queue.
-
#initialize(logger: ::Instana.logger) ⇒ Processor
constructor
A new instance of Processor.
- #on_finish(span) ⇒ Object
-
#on_start(_) ⇒ Object
Note that we’ve started a new span.
-
#queued_spans ⇒ Array
Retrieves all of the traces in @queue and returns the sum of their raw spans.
-
#send(&block) ⇒ Object
send.
-
#span_metrics ⇒ Object
Clears and retrieves metrics associated with span creation and submission.
Constructor Details
#initialize(logger: ::Instana.logger) ⇒ Processor
Returns a new instance of Processor.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/instana/trace/processor.rb', line 11 def initialize(logger: ::Instana.logger) # The main queue before being reported to the # host agent. Spans in this queue are complete # and ready to be sent. @queue = Queue.new # This is the maximum number of spans we send to the host # agent at once. @batch_size = 3000 @logger = logger @pid = Process.pid @spans_opened = Concurrent::AtomicFixnum.new(0) @spans_closed = Concurrent::AtomicFixnum.new(0) end |
Instance Method Details
#clear! ⇒ Object
Removes all traces from the @queue. Used in the test suite to reset state.
109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/instana/trace/processor.rb', line 109 def clear! @spans_opened.value = 0 @spans_closed.value = 0 until @queue.empty? # Non-blocking pop; ignore exception begin @queue.pop(true) rescue nil end end end |
#on_finish(span) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/instana/trace/processor.rb', line 33 def on_finish(span) # :nocov: if @pid != Process.pid @logger.info("Proces `#{@pid}` has forked into #{Process.pid}. Running post fork hook.") ::Instana.config[:post_fork_proc].call @pid = Process.pid end # :nocov: @spans_closed.increment @queue.push(span) end |
#on_start(_) ⇒ Object
Note that we’ve started a new span. Used to generate monitoring metrics.
29 30 31 |
# File 'lib/instana/trace/processor.rb', line 29 def on_start(_) @spans_opened.increment end |
#queued_spans ⇒ Array
Retrieves all of the traces in @queue and returns the sum of their raw spans. This is used by Processor::send and in the test suite. Note that traces retrieved with this method are removed entirely from the queue.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/instana/trace/processor.rb', line 89 def queued_spans return [] if @queue.empty? spans = [] until @queue.empty? # Non-blocking pop; ignore exception span = begin @queue.pop(true) rescue nil end spans << span.raw if span.is_a?(Span) && span.context.level == 1 end spans end |
#send(&block) ⇒ Object
send
Sends all traces in @queue to the host agent
FIXME: Add limits checking here in regards to:
- Max HTTP Post size
- Out of control/growing queue
- Prevent another run of the timer while this is running
71 72 73 74 75 76 77 78 79 |
# File 'lib/instana/trace/processor.rb', line 71 def send(&block) return if @queue.empty? || ENV.key?('INSTANA_TEST') # Retrieve all spans for queued traces spans = queued_spans # Report spans in batches spans.each_slice(@batch_size, &block) end |
#span_metrics ⇒ Object
Clears and retrieves metrics associated with span creation and submission
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/instana/trace/processor.rb', line 47 def span_metrics response = { opened: @spans_opened.value, closed: @spans_closed.value, filtered: 0, dropped: 0 } @spans_opened.value = 0 @spans_closed.value = 0 response end |