Class: Instana::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/instana/tracing/processor.rb

Instance Method Summary collapse

Constructor Details

#initializeProcessor

Returns a new instance of Processor.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/instana/tracing/processor.rb', line 6

def initialize
  # The main queue before being reported to the
  # host agent.  Traces in this queue are complete
  # and ready to be sent.
  @queue = Queue.new

  # The staging queue that holds traces that have completed
  # but still have outstanding async spans.
  # Traces that have been in this queue for more than
  # 5 minutes are discarded.
  @staging_queue = Set.new

  # No access to the @staging_queue until this lock
  # is taken.
  @staging_lock = Mutex.new

  # This is the maximum number of spans we send to the host
  # agent at once.
  @batch_size = 3000
end

Instance Method Details

#add(trace) ⇒ Object

Adds a trace to the queue to be processed and sent to the host agent

Parameters:

  • the (Trace)

    trace to be added to the queue



31
32
33
34
35
36
37
38
39
# File 'lib/instana/tracing/processor.rb', line 31

def add(trace)
  # Do a quick checkup on our background thread.
  if ::Instana.agent.collect_thread.nil? || !::Instana.agent.collect_thread.alive?
    ::Instana.agent.spawn_background_thread
  end

  # ::Instana.logger.debug("Queuing completed trace id: #{trace.id}")
  @queue.push(trace)
end

#clear!Object

Removes all traces from the @queue and @staging_queue. Used in the test suite to reset state.



201
202
203
204
205
206
207
# File 'lib/instana/tracing/processor.rb', line 201

def clear!
  until @queue.empty? do
    # Non-blocking pop; ignore exception
    @queue.pop(true) rescue nil
  end
  @staging_queue.clear
end

#process_stagedObject

This will run through the staged traces (if any) to find completed or timed out incompleted traces. Completed traces will be added to the main @queue. Timed out traces will be discarded



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/instana/tracing/processor.rb', line 53

def process_staged
  @staging_lock.synchronize {
    if @staging_queue.size > 0
      @staging_queue.delete_if do |t|
        if t.complete?
          ::Instana.logger.debug("Moving staged complete trace to main queue: #{t.id}")
          add(t)
          true
        elsif t.discard?
          ::Instana.logger.debug("Discarding trace with uncompleted async spans over 5 mins old. id: #{t.id}")
          true
        else
          false
        end
      end
    end
  }
end

#queue_countInteger

Get the number traces currently in the queue

Returns:

  • (Integer)

    the queue size



186
187
188
# File 'lib/instana/tracing/processor.rb', line 186

def queue_count
  @queue.size
end

#queued_spansArray

Retrieves all of the traces in @queue and returns the sum of their raw spans. This is used by Processor::send and in the test suite. Note that traces retrieved with this method are removed entirely from the queue.

Returns:

  • (Array)

    An array of [Span] or empty



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/instana/tracing/processor.rb', line 114

def queued_spans
  return [] if @queue.empty?

  spans = []
  until @queue.empty? do
    # Non-blocking pop; ignore exception
    trace = @queue.pop(true) rescue nil
    trace.spans.each do |s|
      spans << s.raw
    end
  end
  spans
end

#queued_tracesArray

Retrieves all of the traces that are in @queue. Note that traces retrieved with this method are removed entirely from the queue.

Returns:

  • (Array)

    An array of [Trace] or empty



134
135
136
137
138
139
140
141
142
143
# File 'lib/instana/tracing/processor.rb', line 134

def queued_traces
  return [] if @queue.empty?

  traces = []
  until @queue.empty? do
    # Non-blocking pop; ignore exception
    traces << @queue.pop(true) rescue nil
  end
  traces
end

#sendObject

send

Sends all traces in @queue to the host agent

FIXME: Add limits checking here in regards to:

- Max HTTP Post size
- Out of control/growing queue
- Prevent another run of the timer while this is running


83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/instana/tracing/processor.rb', line 83

def send
  return if @queue.empty? || ENV['INSTANA_GEM_TEST']

  size = @queue.size
  if size > 100
    Instana.logger.debug "Trace queue is #{size}"
  end

  # Scan for any staged but incomplete traces that have now
  # completed.
  process_staged

  # Retrieve all spans for queued traces
  spans = queued_spans

  # Report spans in batches
  batch = spans.shift(@batch_size)
  while !batch.empty? do
    ::Instana.agent.report_spans(batch)
    batch = spans.shift(@batch_size)
  end
end

#stage(trace) ⇒ Object

Adds a trace to the staging queue.

Parameters:

  • the (Trace)

    trace to be added to the queue



44
45
46
47
# File 'lib/instana/tracing/processor.rb', line 44

def stage(trace)
  ::Instana.logger.debug("Staging incomplete trace id: #{trace.id}")
  @staging_queue.add(trace)
end

#staged_countInteger

Get the number traces currently in the staging queue

Returns:

  • (Integer)

    the queue size



194
195
196
# File 'lib/instana/tracing/processor.rb', line 194

def staged_count
  @staging_queue.size
end

#staged_trace(trace_id) ⇒ Object

Retrieves a single staged trace from the staging queue. Staged traces are traces that have completed but may have outstanding asynchronous spans.

Parameters:

  • trace_id (Integer)

    the Trace ID to be searched for



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/instana/tracing/processor.rb', line 166

def staged_trace(trace_id)
  candidate = nil
  @staging_lock.synchronize {
    @staging_queue.each do |trace|
      if trace.id == trace_id
        candidate = trace
        break
      end
    end
  }
  unless candidate
    ::Instana.logger.debug("Couldn't find staged trace with trace_id: #{trace_id}")
  end
  candidate
end

#staged_tracesArray

Retrieves a all staged traces from the staging queue. Staged traces are traces that have completed but may have outstanding asynchronous spans.

Returns:

  • (Array)


151
152
153
154
155
156
157
158
# File 'lib/instana/tracing/processor.rb', line 151

def staged_traces
  traces = nil
  @staging_lock.synchronize {
    traces = @staging_queue.to_a
    @staging_queue.clear
  }
  traces
end