Class: Instana::Processor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/instana/trace/processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(logger: ::Instana.logger) ⇒ Processor

Returns a new instance of Processor.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/instana/trace/processor.rb', line 11

def initialize(logger: ::Instana.logger)
  # The main queue before being reported to the
  # host agent.  Spans in this queue are complete
  # and ready to be sent.
  @queue = Queue.new

  # This is the maximum number of spans we send to the host
  # agent at once.
  @batch_size = 3000
  @logger = logger
  @pid = Process.pid

  @spans_opened = Concurrent::AtomicFixnum.new(0)
  @spans_closed = Concurrent::AtomicFixnum.new(0)
end

Instance Method Details

#clear!Object

Removes all traces from the @queue. Used in the test suite to reset state.



109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/instana/trace/processor.rb', line 109

def clear!
  @spans_opened.value = 0
  @spans_closed.value = 0

  until @queue.empty?
    # Non-blocking pop; ignore exception
    begin
      @queue.pop(true)
    rescue
      nil
    end
  end
end

#on_finish(span) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/instana/trace/processor.rb', line 33

def on_finish(span)
  # :nocov:
  if @pid != Process.pid
    @logger.info("Proces `#{@pid}` has forked into #{Process.pid}. Running post fork hook.")
    ::Instana.config[:post_fork_proc].call
    @pid = Process.pid
  end
  # :nocov:

  @spans_closed.increment
  @queue.push(span)
end

#on_start(_) ⇒ Object

Note that we’ve started a new span. Used to generate monitoring metrics.



29
30
31
# File 'lib/instana/trace/processor.rb', line 29

def on_start(_)
  @spans_opened.increment
end

#queued_spansArray

Retrieves all of the traces in @queue and returns the sum of their raw spans. This is used by Processor::send and in the test suite. Note that traces retrieved with this method are removed entirely from the queue.

Returns:

  • (Array)

    An array of [Span] or empty



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/instana/trace/processor.rb', line 89

def queued_spans
  return [] if @queue.empty?

  spans = []
  until @queue.empty?
    # Non-blocking pop; ignore exception
    span = begin
      @queue.pop(true)
    rescue
      nil
    end
    spans << span.raw if span.is_a?(Span) && span.context.level == 1
  end

  spans
end

#send(&block) ⇒ Object

send

Sends all traces in @queue to the host agent

FIXME: Add limits checking here in regards to:

- Max HTTP Post size
- Out of control/growing queue
- Prevent another run of the timer while this is running


71
72
73
74
75
76
77
78
79
# File 'lib/instana/trace/processor.rb', line 71

def send(&block)
  return if @queue.empty? || ENV.key?('INSTANA_TEST')

  # Retrieve all spans for queued traces
  spans = queued_spans

  # Report spans in batches
  spans.each_slice(@batch_size, &block)
end

#span_metricsObject

Clears and retrieves metrics associated with span creation and submission



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/instana/trace/processor.rb', line 47

def span_metrics
  response = {
    opened: @spans_opened.value,
    closed: @spans_closed.value,
    filtered: 0,
    dropped: 0
  }

  @spans_opened.value = 0
  @spans_closed.value = 0

  response
end