Class: OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/opentelemetry/sdk/trace/export/batch_span_processor.rb

Overview

Implementation of the duck type SpanProcessor that batches spans exported by the SDK then pushes them to the exporter pipeline.

All spans reported by the SDK implementation are first added to a synchronized queue (with a #max_queue_size maximum size, after the size is reached spans are dropped) and exported every schedule_delay_millis to the exporter pipeline in batches of max_export_batch_size.

If the queue gets half full a preemptive notification is sent to the worker thread that exports the spans to wake up and start a new export cycle.

max_export_attempts attempts are made to export each batch, while export fails with FAILED_RETRYABLE, backing off linearly in 100ms increments.

Constant Summary collapse

EXPORTER_TIMEOUT_MILLIS =
30_000

Instance Method Summary collapse

Constructor Details

#initialize(exporter:, exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_export_batch_size: MAX_EXPORT_BATCH_SIZE, max_export_attempts: MAX_EXPORT_ATTEMPTS) ⇒ BatchSpanProcessor

Returns a new instance of BatchSpanProcessor.

Raises:

  • (ArgumentError)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 37

def initialize(exporter:,
               exporter_timeout_millis: EXPORTER_TIMEOUT_MILLIS,
               schedule_delay_millis: SCHEDULE_DELAY_MILLIS,
               max_queue_size: MAX_QUEUE_SIZE,
               max_export_batch_size: MAX_EXPORT_BATCH_SIZE,
               max_export_attempts: MAX_EXPORT_ATTEMPTS)
  raise ArgumentError if max_export_batch_size > max_queue_size

  @exporter = exporter
  @exporter_timeout_seconds = exporter_timeout_millis / 1000.0
  @mutex = Mutex.new
  @condition = ConditionVariable.new
  @keep_running = true
  @delay_seconds = schedule_delay_millis / 1000.0
  @max_queue_size = max_queue_size
  @batch_size = max_export_batch_size
  @export_attempts = max_export_attempts
  @spans = []
  @thread = Thread.new { work }
end

Instance Method Details

#on_finish(span) ⇒ Object

adds a span to the batcher, threadsafe may block on lock



64
65
66
67
68
69
70
71
72
73
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 64

def on_finish(span) # rubocop:disable Metrics/AbcSize
  return unless span.context.trace_flags.sampled?

  lock do
    n = spans.size + 1 - max_queue_size
    spans.shift(n) if n.positive?
    spans << span
    @condition.signal if spans.size > max_queue_size / 2
  end
end

#on_start(span) ⇒ Object

does nothing for this processor



59
60
61
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 59

def on_start(span)
  # noop
end

#shutdownObject

shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished



77
78
79
80
81
82
83
84
85
86
# File 'lib/opentelemetry/sdk/trace/export/batch_span_processor.rb', line 77

def shutdown
  lock do
    @keep_running = false
    @condition.signal
  end

  @thread.join
  flush
  @exporter.shutdown
end