Class: Langfuse::Client
- Inherits:
-
Object
- Object
- Langfuse::Client
- Includes:
- Singleton
- Defined in:
- lib/langfuse/client.rb
Instance Method Summary collapse
-
#event(attributes = {}) ⇒ Object
Creates a new event within a trace.
-
#flush ⇒ Object
Flushes all pending events to the API.
-
#generation(attributes = {}) ⇒ Object
Creates a new generation within a trace.
-
#initialize ⇒ Client
constructor
A new instance of Client.
-
#score(attributes = {}) ⇒ Object
Creates a new score.
-
#shutdown ⇒ Object
Gracefully shuts down the client, ensuring all events are flushed.
-
#span(attributes = {}) ⇒ Object
Creates a new span within a trace.
-
#trace(attributes = {}) ⇒ Object
Creates a new trace.
-
#update_generation(generation) ⇒ Object
Updates an existing generation.
-
#update_span(span) ⇒ Object
Updates an existing span.
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/langfuse/client.rb', line 9 def initialize @config = Langfuse.configuration @events = Concurrent::Array.new # Thread-safe array @mutex = Mutex.new # For operations that need additional thread safety # Start periodic flusher only in server context schedule_periodic_flush if defined?(Rails) && Rails.server? # Register shutdown hook return if @config.disable_at_exit_hook at_exit { shutdown } end |
Instance Method Details
#event(attributes = {}) ⇒ Object
Creates a new event within a trace
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/langfuse/client.rb', line 87 def event(attributes = {}) raise ArgumentError, 'trace_id is required for creating an event' unless attributes[:trace_id] event_obj = Models::Event.new(attributes) event = Models::IngestionEvent.new( type: 'event-create', body: event_obj ) enqueue_event(event) event_obj end |
#flush ⇒ Object
Flushes all pending events to the API
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/langfuse/client.rb', line 113 def flush events_to_process = nil # Atomically swap the events array to avoid race conditions @mutex.synchronize do events_to_process = @events.dup @events.clear end return if events_to_process.empty? # Convert objects to hashes for serialization event_hashes = events_to_process.map(&:to_h) log("Flushing #{event_hashes.size} events") # Send to background worker BatchWorker.perform_async(event_hashes) end |
#generation(attributes = {}) ⇒ Object
Creates a new generation within a trace
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/langfuse/client.rb', line 60 def generation(attributes = {}) raise ArgumentError, 'trace_id is required for creating a generation' unless attributes[:trace_id] generation = Models::Generation.new(attributes) event = Models::IngestionEvent.new( type: 'generation-create', body: generation ) enqueue_event(event) generation end |
#score(attributes = {}) ⇒ Object
Creates a new score
100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/langfuse/client.rb', line 100 def score(attributes = {}) raise ArgumentError, 'trace_id is required for creating a score' unless attributes[:trace_id] score = Models::Score.new(attributes) event = Models::IngestionEvent.new( type: 'score-create', body: score ) enqueue_event(event) score end |
#shutdown ⇒ Object
Gracefully shuts down the client, ensuring all events are flushed
134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/langfuse/client.rb', line 134 def shutdown log('Shutting down Langfuse client...') # Cancel the flush timer if it's running @flush_thread&.exit # Flush any remaining events flush log('Langfuse client shut down.') end |
#span(attributes = {}) ⇒ Object
Creates a new span within a trace
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/langfuse/client.rb', line 35 def span(attributes = {}) raise ArgumentError, 'trace_id is required for creating a span' unless attributes[:trace_id] span = Models::Span.new(attributes) event = Models::IngestionEvent.new( type: 'span-create', body: span ) enqueue_event(event) span end |
#trace(attributes = {}) ⇒ Object
Creates a new trace
24 25 26 27 28 29 30 31 32 |
# File 'lib/langfuse/client.rb', line 24 def trace(attributes = {}) trace = Models::Trace.new(attributes) event = Models::IngestionEvent.new( type: 'trace-create', body: trace ) enqueue_event(event) trace end |
#update_generation(generation) ⇒ Object
Updates an existing generation
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/langfuse/client.rb', line 73 def update_generation(generation) unless generation.id && generation.trace_id raise ArgumentError, 'generation.id and generation.trace_id are required for updating a generation' end event = Models::IngestionEvent.new( type: 'generation-update', body: generation ) enqueue_event(event) generation end |
#update_span(span) ⇒ Object
Updates an existing span
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/langfuse/client.rb', line 48 def update_span(span) raise ArgumentError, 'span.id and span.trace_id are required for updating a span' unless span.id && span.trace_id event = Models::IngestionEvent.new( type: 'span-update', body: span ) enqueue_event(event) span end |