Class: OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Inherits:
-
Object
- Object
- OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor
- Defined in:
- lib/opentelemetry/sdk/trace/export/batch_span_processor.rb
Overview
Implementation of the duck type SpanProcessor that batches spans exported by the SDK then pushes them to the exporter pipeline.
All spans reported by the SDK implementation are first added to a synchronized queue (with a #max_queue_size maximum size, after the size is reached spans are dropped) and exported every schedule_delay_millis to the exporter pipeline in batches of max_export_batch_size.
If the queue gets half full a preemptive notification is sent to the worker thread that exports the spans to wake up and start a new export cycle.
Constant Summary collapse
- EXPORTER_TIMEOUT_MILLIS =
30_000
Instance Method Summary collapse
-
#force_flush ⇒ Object
TODO: test this explicitly.
-
#initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE) ⇒ BatchSpanProcessor
constructor
A new instance of BatchSpanProcessor.
-
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock.
-
#on_start(span) ⇒ Object
does nothing for this processor.
-
#shutdown ⇒ Object
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
Constructor Details
#initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE) ⇒ BatchSpanProcessor
Returns a new instance of BatchSpanProcessor.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 32 def initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE) raise ArgumentError if max_export_batch_size > max_queue_size @exporter = exporter @exporter_timeout_seconds = exporter_timeout_millis / 1000.0 @mutex = Mutex.new @condition = ConditionVariable.new @keep_running = true @delay_seconds = schedule_delay_millis / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size @spans = [] @thread = Thread.new { work } end |
Instance Method Details
#force_flush ⇒ Object
TODO: test this explicitly.
Export all ended spans to the configured Exporter that have not yet
been exported.
This method should only be called in cases where it is absolutely
necessary, such as when using some FaaS providers that may suspend
the process after an invocation, but before the Processor exports
the completed spans.
76 77 78 79 80 81 82 83 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 76 def force_flush snapshot = lock { spans.shift(spans.size) } until snapshot.empty? batch = snapshot.shift(@batch_size).map!(&:to_span_data) result_code = @exporter.export(batch) report_result(result_code, batch) end end |
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 57 def on_finish(span) # rubocop:disable Metrics/AbcSize return unless span.context.trace_flags.sampled? lock do n = spans.size + 1 - max_queue_size spans.shift(n) if n.positive? spans << span @condition.signal if spans.size > max_queue_size / 2 end end |
#on_start(span) ⇒ Object
does nothing for this processor
52 53 54 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 52 def on_start(span) # noop end |
#shutdown ⇒ Object
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 87 def shutdown lock do @keep_running = false @condition.signal end @thread.join force_flush @exporter.shutdown end |