Class: OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Inherits:
-
Object
- Object
- OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Defined in:
- lib/opentelemetry/sdk/trace/export/batch_span_processor.rb
Overview
Implementation of the duck type SpanProcessor that batches spans exported by the SDK then pushes them to the exporter pipeline.
All spans reported by the SDK implementation are first added to a synchronized queue (with a #max_queue_size maximum size, after the size is reached spans are dropped) and exported every schedule_delay_millis to the exporter pipeline in batches of max_export_batch_size.
If the queue gets half full a preemptive notification is sent to the worker thread that exports the spans to wake up and start a new export cycle.
max_export_attempts attempts are made to export each batch, while export fails with FAILED_RETRYABLE, backing off linearly in 100ms increments.
Constant Summary collapse
- EXPORTER_TIMEOUT_MILLIS =
30_000
Instance Method Summary collapse
-
#force_flush ⇒ Object
TODO: test this explicitly.
-
#initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE, max_export_attempts: MAX_EXPORT_ATTEMPTS) ⇒ BatchSpanProcessor
constructor
A new instance of BatchSpanProcessor.
-
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock.
-
#on_start(span) ⇒ Object
does nothing for this processor.
-
#shutdown ⇒ Object
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
Constructor Details
#initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE, max_export_attempts: MAX_EXPORT_ATTEMPTS) ⇒ BatchSpanProcessor
Returns a new instance of BatchSpanProcessor.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 37 def initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE, max_export_attempts: MAX_EXPORT_ATTEMPTS) raise ArgumentError if max_export_batch_size > max_queue_size @exporter = exporter @exporter_timeout_seconds = exporter_timeout_millis / 1000.0 @mutex = Mutex.new @condition = ConditionVariable.new @keep_running = true @delay_seconds = schedule_delay_millis / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size @export_attempts = max_export_attempts @spans = [] @thread = Thread.new { work } end |
Instance Method Details
#force_flush ⇒ Object
TODO: test this explicitly.
Export all ended spans to the configured Exporter that have not yet
been exported.
This method should only be called in cases where it is absolutely
necessary, such as when using some FaaS providers that may suspend
the process after an invocation, but before the Processor exports
the completed spans.
83 84 85 86 87 88 89 90 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 83 def force_flush snapshot = lock { spans.shift(spans.size) } until snapshot.empty? batch = snapshot.shift(@batch_size).map!(&:to_span_data) result_code = @exporter.export(batch) report_result(result_code, batch) end end |
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 64 def on_finish(span) # rubocop:disable Metrics/AbcSize return unless span.context.trace_flags.sampled? lock do n = spans.size + 1 - max_queue_size spans.shift(n) if n.positive? spans << span @condition.signal if spans.size > max_queue_size / 2 end end |
#on_start(span) ⇒ Object
does nothing for this processor
59 60 61 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 59 def on_start(span) # noop end |
#shutdown ⇒ Object
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 94 def shutdown lock do @keep_running = false @condition.signal end @thread.join force_flush @exporter.shutdown end |