Class: Instana::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/instana/tracing/processor.rb

Instance Method Summary collapse

Constructor Details

#initializeProcessor

Returns a new instance of Processor.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/instana/tracing/processor.rb', line 6

def initialize
  # The main queue before being reported to the
  # host agent.  Traces in this queue are complete
  # and ready to be sent.
  @queue = Queue.new

  # The staging queue that holds traces that have completed
  # but still have outstanding async spans.
  # Traces that have been in this queue for more than
  # 5 minutes are discarded.
  @staging_queue = Set.new

  # No access to the @staging_queue until this lock
  # is taken.
  @staging_lock = Mutex.new
end

Instance Method Details

#add(trace) ⇒ Object

Adds a trace to the queue to be processed and sent to the host agent

Parameters:

  • the (Trace)

    trace to be added to the queue



27
28
29
30
# File 'lib/instana/tracing/processor.rb', line 27

def add(trace)
  ::Instana.logger.trace("Queuing completed trace id: #{trace.id}")
  @queue.push(trace)
end

#clear!Object

Removes all traces from the @queue and @staging_queue. Used in the test suite to reset state.



188
189
190
191
192
193
194
# File 'lib/instana/tracing/processor.rb', line 188

def clear!
  until @queue.empty? do
    # Non-blocking pop; ignore exception
    @queue.pop(true) rescue nil
  end
  @staging_queue.clear
end

#process_stagedObject

This will run through the staged traces (if any) to find completed or timed out incompleted traces. Completed traces will be added to the main @queue. Timed out traces will be discarded



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/instana/tracing/processor.rb', line 44

def process_staged
  @staging_lock.synchronize {
    if @staging_queue.size > 0
      @staging_queue.delete_if do |t|
        if t.complete?
          ::Instana.logger.trace("Moving staged complete trace to main queue: #{t.id}")
          add(t)
          true
        elsif t.discard?
          ::Instana.logger.debug("Discarding trace with uncompleted async spans over 5 mins old. id: #{t.id}")
          true
        else
          false
        end
      end
    end
  }
end

#queue_countInteger

Get the number traces currently in the queue

Returns:

  • (Integer)

    the queue size



173
174
175
# File 'lib/instana/tracing/processor.rb', line 173

def queue_count
  @queue.size
end

#queued_spansArray

Retrieves all of the traces in @queue and returns the sum of their raw spans. This is used by Processor::send and in the test suite. Note that traces retrieved with this method are removed entirely from the queue.

Returns:

  • (Array)

    An array of [Span] or empty



100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/instana/tracing/processor.rb', line 100

def queued_spans
  return [] if @queue.empty?

  spans = []
  until @queue.empty? do
    # Non-blocking pop; ignore exception
    trace = @queue.pop(true) rescue nil
    trace.spans.each do |s|
      spans << s.raw
    end
  end
  spans
end

#queued_tracesArray

Retrieves all of the traces that are in @queue. Note that traces retrieved with this method are removed entirely from the queue.

Returns:

  • (Array)

    An array of [Trace] or empty



120
121
122
123
124
125
126
127
128
129
# File 'lib/instana/tracing/processor.rb', line 120

def queued_traces
  return [] if @queue.empty?

  traces = []
  until @queue.empty? do
    # Non-blocking pop; ignore exception
    traces << @queue.pop(true) rescue nil
  end
  traces
end

#sendObject

send

Sends all traces in @queue to the host agent

FIXME: Add limits checking here in regards to:

- Max HTTP Post size
- Out of control/growing queue
- Prevent another run of the timer while this is running


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/instana/tracing/processor.rb', line 74

def send
  return if @queue.empty? || ENV['INSTANA_GEM_TEST']

  size = @queue.size
  if size > 100
    Instana.logger.debug "Trace queue is #{size}"
  end

  # Scan for any staged but incomplete traces that have now
  # completed.
  process_staged

  # Retrieve all spans for queued traces
  spans = queued_spans

  ::Instana.agent.report_spans(spans)
end

#stage(trace) ⇒ Object

Adds a trace to the staging queue.

Parameters:

  • the (Trace)

    trace to be added to the queue



35
36
37
38
# File 'lib/instana/tracing/processor.rb', line 35

def stage(trace)
  ::Instana.logger.trace("Staging incomplete trace id: #{trace.id}")
  @staging_queue.add(trace)
end

#staged_countInteger

Get the number traces currently in the staging queue

Returns:

  • (Integer)

    the queue size



181
182
183
# File 'lib/instana/tracing/processor.rb', line 181

def staged_count
  @staging_queue.size
end

#staged_trace(ids) ⇒ Object

Retrieves a single staged trace from the staging queue. Staged traces are traces that have completed but may have outstanding asynchronous spans.

Parameters:

  • ids (Hash)

    the Trace ID and Span ID in the form of :trace_id => 12345 :span_id => 12345



154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/instana/tracing/processor.rb', line 154

def staged_trace(ids)
  candidate = nil
  @staging_lock.synchronize {
    @staging_queue.each do |trace|
      if trace.id == ids[:trace_id]
        candidate = trace
      end
    end
  }
  unless candidate
    ::Instana.logger.trace("Couldn't find staged trace with trace_id: #{ids[:trace_id]}")
  end
  candidate
end

#staged_tracesArray

Retrieves a all staged traces from the staging queue. Staged traces are traces that have completed but may have outstanding asynchronous spans.

Returns:

  • (Array)


137
138
139
140
141
142
143
144
# File 'lib/instana/tracing/processor.rb', line 137

def staged_traces
  traces = nil
  @staging_lock.synchronize {
    traces = @staging_queue.to_a
    @staging_queue.clear
  }
  traces
end