Class: OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Inherits:
-
Object
- Object
- OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Defined in:
- lib/opentelemetry/sdk/trace/export/batch_span_processor.rb
Overview
Implementation of the duck type SpanProcessor that batches spans exported by the SDK then pushes them to the exporter pipeline.
All spans reported by the SDK implementation are first added to a synchronized queue (with a #max_queue_size maximum size, after the size is reached spans are dropped) and exported every schedule_delay_millis to the exporter pipeline in batches of max_export_batch_size.
If the queue gets half full a preemptive notification is sent to the worker thread that exports the spans to wake up and start a new export cycle.
max_export_attempts attempts are made to export each batch, while export fails with FAILED_RETRYABLE, backing off linearly in 100ms increments.
Constant Summary collapse
- EXPORTER_TIMEOUT_MILLIS =
30_000
Instance Method Summary collapse
-
#initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE, max_export_attempts: MAX_EXPORT_ATTEMPTS) ⇒ BatchSpanProcessor
constructor
A new instance of BatchSpanProcessor.
-
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock.
-
#on_start(span) ⇒ Object
does nothing for this processor.
-
#shutdown ⇒ Object
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
Constructor Details
#initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE, max_export_attempts: MAX_EXPORT_ATTEMPTS) ⇒ BatchSpanProcessor
Returns a new instance of BatchSpanProcessor.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 37 def initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE, max_export_attempts: MAX_EXPORT_ATTEMPTS) raise ArgumentError if max_export_batch_size > max_queue_size @exporter = exporter @exporter_timeout_seconds = exporter_timeout_millis / 1000.0 @mutex = Mutex.new @condition = ConditionVariable.new @keep_running = true @delay_seconds = schedule_delay_millis / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size @export_attempts = max_export_attempts @spans = [] @thread = Thread.new { work } end |
Instance Method Details
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 64 def on_finish(span) # rubocop:disable Metrics/AbcSize return unless span.context.trace_flags.sampled? lock do n = spans.size + 1 - max_queue_size spans.shift(n) if n.positive? spans << span @condition.signal if spans.size > max_queue_size / 2 end end |
#on_start(span) ⇒ Object
does nothing for this processor
59 60 61 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 59 def on_start(span) # noop end |
#shutdown ⇒ Object
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished
77 78 79 80 81 82 83 84 85 86 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 77 def shutdown lock do @keep_running = false @condition.signal end @thread.join flush @exporter.shutdown end |