Class: DSPy::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/dspy/context.rb

Class Method Summary collapse

Class Method Details

.clear!Object



160
161
162
163
164
165
166
# File 'lib/dspy/context.rb', line 160

def clear!
  # Clear both the thread-specific key and the legacy key
  thread_key = :"dspy_context_#{Thread.current.object_id}"
  Thread.current[thread_key] = nil
  Thread.current[:dspy_context] = nil
  Fiber[:dspy_context] = nil
end

.currentObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/dspy/context.rb', line 8

def current
  # Use Thread storage as primary source to ensure thread isolation
  # Fiber storage is used for OpenTelemetry context propagation within the same thread
  
  # Create a unique key for this thread to ensure isolation
  thread_key = :"dspy_context_#{Thread.current.object_id}"
  
  # Always check thread-local storage first for proper isolation
  if Thread.current[thread_key]
    # Thread has context, ensure fiber inherits it for OpenTelemetry propagation
    Fiber[:dspy_context] = Thread.current[thread_key]
    Thread.current[:dspy_context] = Thread.current[thread_key]  # Keep for backward compatibility
    return Thread.current[thread_key]
  end
  
  # Check if current fiber has context that was set by this same thread
  # This handles cases where context was set via OpenTelemetry propagation within the thread
  if Fiber[:dspy_context] && Thread.current[:dspy_context] == Fiber[:dspy_context]
    # This fiber context was set by this thread, safe to use
    Thread.current[thread_key] = Fiber[:dspy_context]
    return Fiber[:dspy_context]
  end
  
  # No existing context or context belongs to different thread - create new one
  context = {
    trace_id: SecureRandom.uuid,
    span_stack: [],
    otel_span_stack: []
  }
  
  # Set in both Thread and Fiber storage
  Thread.current[thread_key] = context
  Thread.current[:dspy_context] = context  # Keep for backward compatibility
  Fiber[:dspy_context] = context
  
  context
end

.with_span(operation:, **attributes) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/dspy/context.rb', line 46

def with_span(operation:, **attributes)
  span_id = SecureRandom.uuid
  parent_span_id = current[:span_stack].last
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  
  # Prepare attributes with context information
  span_attributes = {
    trace_id: current[:trace_id],
    span_id: span_id,
    parent_span_id: parent_span_id,
    operation: operation,
    **attributes
  }
  
  # Log span start with proper hierarchy (internal logging only)
  DSPy.log('span.start', **span_attributes) if DSPy::Observability.enabled?
  
  # Push to stack for child spans tracking
  current[:span_stack].push(span_id)
  
  begin
    # Use OpenTelemetry's proper context management for nesting
    if DSPy::Observability.enabled? && DSPy::Observability.tracer
      # Prepare attributes and add trace name for root spans
      span_attributes = attributes.transform_keys(&:to_s).reject { |k, v| v.nil? }
      
      # Set trace name if this is likely a root span (no parent in our stack)
      if current[:span_stack].length == 1  # This will be the first span
        span_attributes['langfuse.trace.name'] = operation
      end
      
      # Record start time for explicit duration tracking
      otel_start_time = Time.now
      
      # Get parent OpenTelemetry span for proper context propagation
      parent_otel_span = current[:otel_span_stack].last
      
      # Create span with proper parent context
      if parent_otel_span
        # Use the parent span's context to ensure proper nesting
        OpenTelemetry::Trace.with_span(parent_otel_span) do
          DSPy::Observability.tracer.in_span(
            operation,
            attributes: span_attributes,
            kind: :internal
          ) do |span|
            # Add to our OpenTelemetry span stack
            current[:otel_span_stack].push(span)
            
            begin
              result = yield(span)
              
              # Add explicit timing information to help Langfuse
              if span
                duration_ms = ((Time.now - otel_start_time) * 1000).round(3)
                span.set_attribute('duration.ms', duration_ms)
                span.set_attribute('langfuse.observation.startTime', otel_start_time.iso8601(3))
                span.set_attribute('langfuse.observation.endTime', Time.now.iso8601(3))
              end
              
              result
            ensure
              # Remove from our OpenTelemetry span stack
              current[:otel_span_stack].pop
            end
          end
        end
      else
        # Root span - no parent context needed
        DSPy::Observability.tracer.in_span(
          operation,
          attributes: span_attributes,
          kind: :internal
        ) do |span|
          # Add to our OpenTelemetry span stack
          current[:otel_span_stack].push(span)
          
          begin
            result = yield(span)
            
            # Add explicit timing information to help Langfuse
            if span
              duration_ms = ((Time.now - otel_start_time) * 1000).round(3)
              span.set_attribute('duration.ms', duration_ms)
              span.set_attribute('langfuse.observation.startTime', otel_start_time.iso8601(3))
              span.set_attribute('langfuse.observation.endTime', Time.now.iso8601(3))
            end
            
            result
          ensure
            # Remove from our OpenTelemetry span stack
            current[:otel_span_stack].pop
          end
        end
      end
    else
      yield(nil)
    end
  ensure
    # Pop from stack
    current[:span_stack].pop
    
    # Log span end with duration (internal logging only)
    if DSPy::Observability.enabled?
      duration_ms = ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time) * 1000).round(2)
      DSPy.log('span.end',
        trace_id: current[:trace_id],
        span_id: span_id,
        duration_ms: duration_ms
      )
    end
  end
end