Class: Instana::Processor
- Inherits:
-
Object
- Object
- Instana::Processor
- Defined in:
- lib/instana/tracing/processor.rb
Instance Method Summary collapse
-
#add(trace) ⇒ Object
Adds a trace to the queue to be processed and sent to the host agent.
-
#clear! ⇒ Object
Removes all traces from the @queue and @staging_queue.
-
#initialize ⇒ Processor
constructor
A new instance of Processor.
-
#process_staged ⇒ Object
This will run through the staged traces (if any) to find completed or timed out incompleted traces.
-
#queue_count ⇒ Integer
Get the number traces currently in the queue.
-
#queued_spans ⇒ Array
Retrieves all of the traces in @queue and returns the sum of their raw spans.
-
#queued_traces ⇒ Array
Retrieves all of the traces that are in @queue.
-
#send ⇒ Object
send.
-
#stage(trace) ⇒ Object
Adds a trace to the staging queue.
-
#staged_count ⇒ Integer
Get the number traces currently in the staging queue.
-
#staged_trace(ids) ⇒ Object
Retrieves a single staged trace from the staging queue.
-
#staged_traces ⇒ Array
Retrieves a all staged traces from the staging queue.
Constructor Details
#initialize ⇒ Processor
Returns a new instance of Processor.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/instana/tracing/processor.rb', line 6 def initialize # The main queue before being reported to the # host agent. Traces in this queue are complete # and ready to be sent. @queue = Queue.new # The staging queue that holds traces that have completed # but still have outstanding async spans. # Traces that have been in this queue for more than # 5 minutes are discarded. @staging_queue = Set.new # No access to the @staging_queue until this lock # is taken. @staging_lock = Mutex.new end |
Instance Method Details
#add(trace) ⇒ Object
Adds a trace to the queue to be processed and sent to the host agent
27 28 29 30 |
# File 'lib/instana/tracing/processor.rb', line 27 def add(trace) ::Instana.logger.trace("Queuing completed trace id: #{trace.id}") @queue.push(trace) end |
#clear! ⇒ Object
Removes all traces from the @queue and @staging_queue. Used in the test suite to reset state.
188 189 190 191 192 193 194 |
# File 'lib/instana/tracing/processor.rb', line 188 def clear! until @queue.empty? do # Non-blocking pop; ignore exception @queue.pop(true) rescue nil end @staging_queue.clear end |
#process_staged ⇒ Object
This will run through the staged traces (if any) to find completed or timed out incompleted traces. Completed traces will be added to the main @queue. Timed out traces will be discarded
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/instana/tracing/processor.rb', line 44 def process_staged @staging_lock.synchronize { if @staging_queue.size > 0 @staging_queue.delete_if do |t| if t.complete? ::Instana.logger.trace("Moving staged complete trace to main queue: #{t.id}") add(t) true elsif t.discard? ::Instana.logger.debug("Discarding trace with uncompleted async spans over 5 mins old. id: #{t.id}") true else false end end end } end |
#queue_count ⇒ Integer
Get the number traces currently in the queue
173 174 175 |
# File 'lib/instana/tracing/processor.rb', line 173 def queue_count @queue.size end |
#queued_spans ⇒ Array
Retrieves all of the traces in @queue and returns the sum of their raw spans. This is used by Processor::send and in the test suite. Note that traces retrieved with this method are removed entirely from the queue.
100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/instana/tracing/processor.rb', line 100 def queued_spans return [] if @queue.empty? spans = [] until @queue.empty? do # Non-blocking pop; ignore exception trace = @queue.pop(true) rescue nil trace.spans.each do |s| spans << s.raw end end spans end |
#queued_traces ⇒ Array
Retrieves all of the traces that are in @queue. Note that traces retrieved with this method are removed entirely from the queue.
120 121 122 123 124 125 126 127 128 129 |
# File 'lib/instana/tracing/processor.rb', line 120 def queued_traces return [] if @queue.empty? traces = [] until @queue.empty? do # Non-blocking pop; ignore exception traces << @queue.pop(true) rescue nil end traces end |
#send ⇒ Object
send
Sends all traces in @queue to the host agent
FIXME: Add limits checking here in regards to:
- Max HTTP Post size
- Out of control/growing queue
- Prevent another run of the timer while this is running
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/instana/tracing/processor.rb', line 74 def send return if @queue.empty? || ENV['INSTANA_GEM_TEST'] size = @queue.size if size > 100 Instana.logger.debug "Trace queue is #{size}" end # Scan for any staged but incomplete traces that have now # completed. process_staged # Retrieve all spans for queued traces spans = queued_spans ::Instana.agent.report_spans(spans) end |
#stage(trace) ⇒ Object
Adds a trace to the staging queue.
35 36 37 38 |
# File 'lib/instana/tracing/processor.rb', line 35 def stage(trace) ::Instana.logger.trace("Staging incomplete trace id: #{trace.id}") @staging_queue.add(trace) end |
#staged_count ⇒ Integer
Get the number traces currently in the staging queue
181 182 183 |
# File 'lib/instana/tracing/processor.rb', line 181 def staged_count @staging_queue.size end |
#staged_trace(ids) ⇒ Object
Retrieves a single staged trace from the staging queue. Staged traces are traces that have completed but may have outstanding asynchronous spans.
154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/instana/tracing/processor.rb', line 154 def staged_trace(ids) candidate = nil @staging_lock.synchronize { @staging_queue.each do |trace| if trace.id == ids[:trace_id] candidate = trace end end } unless candidate ::Instana.logger.trace("Couldn't find staged trace with trace_id: #{ids[:trace_id]}") end candidate end |
#staged_traces ⇒ Array
Retrieves a all staged traces from the staging queue. Staged traces are traces that have completed but may have outstanding asynchronous spans.
137 138 139 140 141 142 143 144 |
# File 'lib/instana/tracing/processor.rb', line 137 def staged_traces traces = nil @staging_lock.synchronize { traces = @staging_queue.to_a @staging_queue.clear } traces end |