Class: Instana::Processor
- Inherits:
-
Object
- Object
- Instana::Processor
- Defined in:
- lib/instana/tracing/processor.rb
Instance Method Summary collapse
-
#add(trace) ⇒ Object
Adds a trace to the queue to be processed and sent to the host agent.
-
#clear! ⇒ Object
Removes all traces from the @queue and @staging_queue.
-
#initialize ⇒ Processor
constructor
A new instance of Processor.
-
#process_staged ⇒ Object
This will run through the staged traces (if any) to find completed or timed out incompleted traces.
-
#queue_count ⇒ Integer
Get the number traces currently in the queue.
-
#queued_spans ⇒ Array
Retrieves all of the traces in @queue and returns the sum of their raw spans.
-
#queued_traces ⇒ Array
Retrieves all of the traces that are in @queue.
-
#send ⇒ Object
send.
-
#stage(trace) ⇒ Object
Adds a trace to the staging queue.
-
#staged_count ⇒ Integer
Get the number traces currently in the staging queue.
-
#staged_trace(trace_id) ⇒ Object
Retrieves a single staged trace from the staging queue.
-
#staged_traces ⇒ Array
Retrieves a all staged traces from the staging queue.
Constructor Details
#initialize ⇒ Processor
Returns a new instance of Processor.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/instana/tracing/processor.rb', line 6 def initialize # The main queue before being reported to the # host agent. Traces in this queue are complete # and ready to be sent. @queue = Queue.new # The staging queue that holds traces that have completed # but still have outstanding async spans. # Traces that have been in this queue for more than # 5 minutes are discarded. @staging_queue = Set.new # No access to the @staging_queue until this lock # is taken. @staging_lock = Mutex.new # This is the maximum number of spans we send to the host # agent at once. @batch_size = 3000 end |
Instance Method Details
#add(trace) ⇒ Object
Adds a trace to the queue to be processed and sent to the host agent
31 32 33 34 35 36 37 38 39 |
# File 'lib/instana/tracing/processor.rb', line 31 def add(trace) # Do a quick checkup on our background thread. if ::Instana.agent.collect_thread.nil? || !::Instana.agent.collect_thread.alive? ::Instana.agent.spawn_background_thread end # ::Instana.logger.debug("Queuing completed trace id: #{trace.id}") @queue.push(trace) end |
#clear! ⇒ Object
Removes all traces from the @queue and @staging_queue. Used in the test suite to reset state.
201 202 203 204 205 206 207 |
# File 'lib/instana/tracing/processor.rb', line 201 def clear! until @queue.empty? do # Non-blocking pop; ignore exception @queue.pop(true) rescue nil end @staging_queue.clear end |
#process_staged ⇒ Object
This will run through the staged traces (if any) to find completed or timed out incompleted traces. Completed traces will be added to the main @queue. Timed out traces will be discarded
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/instana/tracing/processor.rb', line 53 def process_staged @staging_lock.synchronize { if @staging_queue.size > 0 @staging_queue.delete_if do |t| if t.complete? ::Instana.logger.debug("Moving staged complete trace to main queue: #{t.id}") add(t) true elsif t.discard? ::Instana.logger.debug("Discarding trace with uncompleted async spans over 5 mins old. id: #{t.id}") true else false end end end } end |
#queue_count ⇒ Integer
Get the number traces currently in the queue
186 187 188 |
# File 'lib/instana/tracing/processor.rb', line 186 def queue_count @queue.size end |
#queued_spans ⇒ Array
Retrieves all of the traces in @queue and returns the sum of their raw spans. This is used by Processor::send and in the test suite. Note that traces retrieved with this method are removed entirely from the queue.
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/instana/tracing/processor.rb', line 114 def queued_spans return [] if @queue.empty? spans = [] until @queue.empty? do # Non-blocking pop; ignore exception trace = @queue.pop(true) rescue nil trace.spans.each do |s| spans << s.raw end end spans end |
#queued_traces ⇒ Array
Retrieves all of the traces that are in @queue. Note that traces retrieved with this method are removed entirely from the queue.
134 135 136 137 138 139 140 141 142 143 |
# File 'lib/instana/tracing/processor.rb', line 134 def queued_traces return [] if @queue.empty? traces = [] until @queue.empty? do # Non-blocking pop; ignore exception traces << @queue.pop(true) rescue nil end traces end |
#send ⇒ Object
send
Sends all traces in @queue to the host agent
FIXME: Add limits checking here in regards to:
- Max HTTP Post size
- Out of control/growing queue
- Prevent another run of the timer while this is running
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/instana/tracing/processor.rb', line 83 def send return if @queue.empty? || ENV['INSTANA_GEM_TEST'] size = @queue.size if size > 100 Instana.logger.debug "Trace queue is #{size}" end # Scan for any staged but incomplete traces that have now # completed. process_staged # Retrieve all spans for queued traces spans = queued_spans # Report spans in batches batch = spans.shift(@batch_size) while !batch.empty? do ::Instana.agent.report_spans(batch) batch = spans.shift(@batch_size) end end |
#stage(trace) ⇒ Object
Adds a trace to the staging queue.
44 45 46 47 |
# File 'lib/instana/tracing/processor.rb', line 44 def stage(trace) ::Instana.logger.debug("Staging incomplete trace id: #{trace.id}") @staging_queue.add(trace) end |
#staged_count ⇒ Integer
Get the number traces currently in the staging queue
194 195 196 |
# File 'lib/instana/tracing/processor.rb', line 194 def staged_count @staging_queue.size end |
#staged_trace(trace_id) ⇒ Object
Retrieves a single staged trace from the staging queue. Staged traces are traces that have completed but may have outstanding asynchronous spans.
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/instana/tracing/processor.rb', line 166 def staged_trace(trace_id) candidate = nil @staging_lock.synchronize { @staging_queue.each do |trace| if trace.id == trace_id candidate = trace break end end } unless candidate ::Instana.logger.debug("Couldn't find staged trace with trace_id: #{trace_id}") end candidate end |
#staged_traces ⇒ Array
Retrieves a all staged traces from the staging queue. Staged traces are traces that have completed but may have outstanding asynchronous spans.
151 152 153 154 155 156 157 158 |
# File 'lib/instana/tracing/processor.rb', line 151 def staged_traces traces = nil @staging_lock.synchronize { traces = @staging_queue.to_a @staging_queue.clear } traces end |