Module: DSPy::Instrumentation
- Defined in:
- lib/dspy/instrumentation.rb,
lib/dspy/instrumentation/token_tracker.rb,
lib/dspy/instrumentation/event_payloads.rb,
lib/dspy/instrumentation/event_payload_factory.rb
Overview
Core instrumentation module using dry-monitor for event emission Provides extension points for logging, OpenTelemetry, New Relic, Langfuse, and custom monitoring
Defined Under Namespace
Modules: EventPayloadFactory, TokenTracker Classes: ChainOfThoughtEvent, ChainOfThoughtReasoningCompleteEvent, CodeActCodeExecutionEvent, CodeActIterationEvent, LMRequestEvent, LMResponseParsedEvent, LMTokensEvent, PredictEvent, PredictValidationErrorEvent, ReactIterationCompleteEvent, ReactIterationEvent, ReactMaxIterationsEvent, ReactToolCallEvent
Class Method Summary collapse
-
.emit(event_name, payload = {}) ⇒ Object
Emit event without timing (for discrete events).
- .emit_event(event_name, payload) ⇒ Object
-
.generate_timestamp ⇒ Object
Generate timestamp in the configured format.
-
.instrument(event_name, payload = {}, &block) ⇒ Object
High-precision timing for performance tracking.
- .langfuse_available? ⇒ Boolean
-
.langfuse_subscriber(**options) ⇒ Object
Get a Langfuse subscriber instance (creates new instance each time).
-
.logger_subscriber(**options) ⇒ Object
Get a logger subscriber instance (creates new instance each time).
- .newrelic_available? ⇒ Boolean
-
.newrelic_subscriber(**options) ⇒ Object
Get a New Relic subscriber instance (creates new instance each time).
- .notifications ⇒ Object
-
.otel_available? ⇒ Boolean
Dependency checking methods.
-
.otel_subscriber(**options) ⇒ Object
Get an OpenTelemetry subscriber instance (creates new instance each time).
-
.register_event(event_name) ⇒ Object
Register additional events dynamically (useful for testing).
-
.restore_timestamp_format(payload_hash) ⇒ Object
Restore timestamp to original format based on configuration.
- .setup_langfuse_subscriber ⇒ Object
- .setup_logger_subscriber ⇒ Object
- .setup_newrelic_subscriber ⇒ Object
- .setup_otel_subscriber ⇒ Object
- .setup_subscriber(subscriber_type) ⇒ Object
- .setup_subscribers ⇒ Object
-
.setup_subscribers_legacy ⇒ Object
Legacy setup method for backward compatibility.
-
.subscribe(event_pattern = nil, &block) ⇒ Object
Subscribe to DSPy instrumentation events.
Class Method Details
.emit(event_name, payload = {}) ⇒ Object
Emit event without timing (for discrete events)
161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/dspy/instrumentation.rb', line 161 def self.emit(event_name, payload = {}) # Handle nil payload payload ||= {} enhanced_payload = payload.merge( status: payload[:status] || 'success' ).merge() # Create typed event struct event_struct = EventPayloadFactory.create_event(event_name, enhanced_payload) self.emit_event(event_name, event_struct) end |
.emit_event(event_name, payload) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/dspy/instrumentation.rb', line 191 def self.emit_event(event_name, payload) # Only emit events - subscribers self-register when explicitly created # Convert struct to hash if needed (dry-monitor expects hash) if payload.respond_to?(:to_h) payload_hash = payload.to_h # Restore original timestamp format if needed (payload_hash) else payload_hash = payload end notifications.instrument(event_name, payload_hash) end |
.generate_timestamp ⇒ Object
Generate timestamp in the configured format
302 303 304 305 306 307 308 309 310 311 312 313 |
# File 'lib/dspy/instrumentation.rb', line 302 def self. case DSPy.config.instrumentation. when DSPy::TimestampFormat::ISO8601 { timestamp: Time.now.iso8601 } when DSPy::TimestampFormat::RFC3339_NANO { timestamp: Time.now.strftime('%Y-%m-%dT%H:%M:%S.%9N%z') } when DSPy::TimestampFormat::UNIX_NANO { timestamp_ns: (Time.now.to_f * 1_000_000_000).to_i } else { timestamp: Time.now.iso8601 } # Fallback to iso8601 end end |
.instrument(event_name, payload = {}, &block) ⇒ Object
High-precision timing for performance tracking
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/dspy/instrumentation.rb', line 118 def self.instrument(event_name, payload = {}, &block) # If no block is given, return early return unless block_given? start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) start_cpu = Process.clock_gettime(Process::CLOCK_PROCESS_CPUTIME_ID) begin result = yield end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) end_cpu = Process.clock_gettime(Process::CLOCK_PROCESS_CPUTIME_ID) enhanced_payload = payload.merge( duration_ms: ((end_time - start_time) * 1000).round(2), cpu_time_ms: ((end_cpu - start_cpu) * 1000).round(2), status: 'success' ).merge() # Create typed event struct event_struct = EventPayloadFactory.create_event(event_name, enhanced_payload) self.emit_event(event_name, event_struct) result rescue => error end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) end_cpu = Process.clock_gettime(Process::CLOCK_PROCESS_CPUTIME_ID) error_payload = payload.merge( duration_ms: ((end_time - start_time) * 1000).round(2), cpu_time_ms: ((end_cpu - start_cpu) * 1000).round(2), status: 'error', error_type: error.class.name, error_message: error. ).merge() # Create typed event struct event_struct = EventPayloadFactory.create_event(event_name, error_payload) self.emit_event(event_name, event_struct) raise end end |
.langfuse_available? ⇒ Boolean
292 293 294 295 296 297 298 299 |
# File 'lib/dspy/instrumentation.rb', line 292 def self.langfuse_available? begin require 'langfuse' true rescue LoadError false end end |
.langfuse_subscriber(**options) ⇒ Object
Get a Langfuse subscriber instance (creates new instance each time)
31 32 33 34 |
# File 'lib/dspy/instrumentation.rb', line 31 def self.langfuse_subscriber(**) require_relative 'subscribers/langfuse_subscriber' DSPy::Subscribers::LangfuseSubscriber.new(**) end |
.logger_subscriber(**options) ⇒ Object
Get a logger subscriber instance (creates new instance each time)
13 14 15 16 |
# File 'lib/dspy/instrumentation.rb', line 13 def self.logger_subscriber(**) require_relative 'subscribers/logger_subscriber' DSPy::Subscribers::LoggerSubscriber.new(**) end |
.newrelic_available? ⇒ Boolean
283 284 285 286 287 288 289 290 |
# File 'lib/dspy/instrumentation.rb', line 283 def self.newrelic_available? begin require 'newrelic_rpm' true rescue LoadError false end end |
.newrelic_subscriber(**options) ⇒ Object
Get a New Relic subscriber instance (creates new instance each time)
25 26 27 28 |
# File 'lib/dspy/instrumentation.rb', line 25 def self.newrelic_subscriber(**) require_relative 'subscribers/newrelic_subscriber' DSPy::Subscribers::NewrelicSubscriber.new(**) end |
.notifications ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/dspy/instrumentation.rb', line 36 def self.notifications @notifications ||= Dry::Monitor::Notifications.new(:dspy).tap do |n| # Register all DSPy events n.register_event('dspy.lm.request') n.register_event('dspy.lm.tokens') n.register_event('dspy.lm.response.parsed') n.register_event('dspy.predict') n.register_event('dspy.predict.validation_error') n.register_event('dspy.chain_of_thought') n.register_event('dspy.chain_of_thought.reasoning_step') n.register_event('dspy.react') n.register_event('dspy.react.tool_call') n.register_event('dspy.react.iteration_complete') n.register_event('dspy.react.max_iterations') # CodeAct events n.register_event('dspy.codeact') n.register_event('dspy.codeact.iteration') n.register_event('dspy.codeact.code_execution') n.register_event('dspy.codeact.iteration_complete') n.register_event('dspy.codeact.max_iterations') # Evaluation events n.register_event('dspy.evaluation.start') n.register_event('dspy.evaluation.example') n.register_event('dspy.evaluation.batch') n.register_event('dspy.evaluation.batch_complete') # Optimization events n.register_event('dspy.optimization.start') n.register_event('dspy.optimization.complete') n.register_event('dspy.optimization.trial_start') n.register_event('dspy.optimization.trial_complete') n.register_event('dspy.optimization.bootstrap_start') n.register_event('dspy.optimization.bootstrap_complete') n.register_event('dspy.optimization.bootstrap_example') n.register_event('dspy.optimization.minibatch_evaluation') n.register_event('dspy.optimization.instruction_proposal_start') n.register_event('dspy.optimization.instruction_proposal_complete') n.register_event('dspy.optimization.error') n.register_event('dspy.optimization.save') n.register_event('dspy.optimization.load') # Storage events n.register_event('dspy.storage.save_start') n.register_event('dspy.storage.save_complete') n.register_event('dspy.storage.save_error') n.register_event('dspy.storage.load_start') n.register_event('dspy.storage.load_complete') n.register_event('dspy.storage.load_error') n.register_event('dspy.storage.delete') n.register_event('dspy.storage.export') n.register_event('dspy.storage.import') n.register_event('dspy.storage.cleanup') # Memory compaction events n.register_event('dspy.memory.compaction_check') n.register_event('dspy.memory.size_compaction') n.register_event('dspy.memory.age_compaction') n.register_event('dspy.memory.deduplication') n.register_event('dspy.memory.relevance_pruning') n.register_event('dspy.memory.compaction_complete') # Registry events n.register_event('dspy.registry.register_start') n.register_event('dspy.registry.register_complete') n.register_event('dspy.registry.register_error') n.register_event('dspy.registry.deploy_start') n.register_event('dspy.registry.deploy_complete') n.register_event('dspy.registry.deploy_error') n.register_event('dspy.registry.rollback_start') n.register_event('dspy.registry.rollback_complete') n.register_event('dspy.registry.rollback_error') n.register_event('dspy.registry.performance_update') n.register_event('dspy.registry.export') n.register_event('dspy.registry.import') n.register_event('dspy.registry.auto_deployment') n.register_event('dspy.registry.automatic_rollback') end end |
.otel_available? ⇒ Boolean
Dependency checking methods
274 275 276 277 278 279 280 281 |
# File 'lib/dspy/instrumentation.rb', line 274 def self.otel_available? begin require 'opentelemetry/sdk' true rescue LoadError false end end |
.otel_subscriber(**options) ⇒ Object
Get an OpenTelemetry subscriber instance (creates new instance each time)
19 20 21 22 |
# File 'lib/dspy/instrumentation.rb', line 19 def self.otel_subscriber(**) require_relative 'subscribers/otel_subscriber' DSPy::Subscribers::OtelSubscriber.new(**) end |
.register_event(event_name) ⇒ Object
Register additional events dynamically (useful for testing)
175 176 177 |
# File 'lib/dspy/instrumentation.rb', line 175 def self.register_event(event_name) notifications.register_event(event_name) end |
.restore_timestamp_format(payload_hash) ⇒ Object
Restore timestamp to original format based on configuration
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/dspy/instrumentation.rb', line 205 def self.(payload_hash) return unless payload_hash[:timestamp] case DSPy.config.instrumentation. when DSPy::TimestampFormat::UNIX_NANO # Convert ISO8601 back to nanoseconds = Time.parse(payload_hash[:timestamp]) payload_hash.delete(:timestamp) payload_hash[:timestamp_ns] = (.to_f * 1_000_000_000).to_i when DSPy::TimestampFormat::RFC3339_NANO # Convert to RFC3339 with nanoseconds = Time.parse(payload_hash[:timestamp]) payload_hash[:timestamp] = .strftime('%Y-%m-%dT%H:%M:%S.%9N%z') end end |
.setup_langfuse_subscriber ⇒ Object
268 269 270 271 |
# File 'lib/dspy/instrumentation.rb', line 268 def self.setup_langfuse_subscriber # Create subscriber - it will read configuration when handling events langfuse_subscriber end |
.setup_logger_subscriber ⇒ Object
253 254 255 256 |
# File 'lib/dspy/instrumentation.rb', line 253 def self.setup_logger_subscriber # Create subscriber - it will read configuration when handling events logger_subscriber end |
.setup_newrelic_subscriber ⇒ Object
263 264 265 266 |
# File 'lib/dspy/instrumentation.rb', line 263 def self.setup_newrelic_subscriber # Create subscriber - it will read configuration when handling events newrelic_subscriber end |
.setup_otel_subscriber ⇒ Object
258 259 260 261 |
# File 'lib/dspy/instrumentation.rb', line 258 def self.setup_otel_subscriber # Create subscriber - it will read configuration when handling events otel_subscriber end |
.setup_subscriber(subscriber_type) ⇒ Object
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/dspy/instrumentation.rb', line 236 def self.setup_subscriber(subscriber_type) case subscriber_type when :logger setup_logger_subscriber when :otel setup_otel_subscriber if otel_available? when :newrelic setup_newrelic_subscriber if newrelic_available? when :langfuse setup_langfuse_subscriber if langfuse_available? else raise ArgumentError, "Unknown subscriber type: #{subscriber_type}" end rescue LoadError => e DSPy.logger.warn "Failed to setup #{subscriber_type} subscriber: #{e.message}" end |
.setup_subscribers ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/dspy/instrumentation.rb', line 221 def self.setup_subscribers config = DSPy.config.instrumentation # Return early if instrumentation is disabled return unless config.enabled # Validate configuration first DSPy.validate_instrumentation! # Setup each configured subscriber config.subscribers.each do |subscriber_type| setup_subscriber(subscriber_type) end end |
.setup_subscribers_legacy ⇒ Object
Legacy setup method for backward compatibility
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
# File 'lib/dspy/instrumentation.rb', line 316 def self.setup_subscribers_legacy # Legacy initialization - will be created when first accessed # Force initialization of enabled subscribers logger_subscriber # Only initialize if dependencies are available begin otel_subscriber if ENV['OTEL_EXPORTER_OTLP_ENDPOINT'] || defined?(OpenTelemetry) rescue LoadError # OpenTelemetry not available, skip end begin newrelic_subscriber if defined?(NewRelic) rescue LoadError # New Relic not available, skip end begin langfuse_subscriber if ENV['LANGFUSE_SECRET_KEY'] || defined?(Langfuse) rescue LoadError # Langfuse not available, skip end end |
.subscribe(event_pattern = nil, &block) ⇒ Object
Subscribe to DSPy instrumentation events
180 181 182 183 184 185 186 187 188 189 |
# File 'lib/dspy/instrumentation.rb', line 180 def self.subscribe(event_pattern = nil, &block) if event_pattern notifications.subscribe(event_pattern, &block) else # Subscribe to all DSPy events %w[dspy.lm.request dspy.lm.tokens dspy.lm.response.parsed dspy.predict dspy.predict.validation_error dspy.chain_of_thought dspy.chain_of_thought.reasoning_step dspy.react dspy.react.tool_call dspy.react.iteration_complete dspy.react.max_iterations].each do |event_name| notifications.subscribe(event_name, &block) end end end |