Class: Langfuse::Client
- Inherits:
-
Object
- Object
- Langfuse::Client
- Extended by:
- T::Sig
- Includes:
- Singleton
- Defined in:
- lib/langfuse/client.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#events ⇒ Object
readonly
Returns the value of attribute events.
-
#flush_thread ⇒ Object
readonly
Returns the value of attribute flush_thread.
Instance Method Summary collapse
- #event(attributes = {}) ⇒ Object
- #flush ⇒ Object
- #generation(attributes = {}) ⇒ Object
-
#initialize ⇒ Client
constructor
A new instance of Client.
- #score(attributes = {}) ⇒ Object
- #shutdown ⇒ Object
- #span(attributes = {}) ⇒ Object
- #trace(attributes = {}) ⇒ Object
- #update_generation(generation) ⇒ Object
- #update_span(span) ⇒ Object
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/langfuse/client.rb', line 27 def initialize @config = T.let(Langfuse.configuration, ::Langfuse::Configuration) # Let Sorbet infer the type for Concurrent::Array here @events = T.let(Concurrent::Array.new, Concurrent::Array) @mutex = T.let(Mutex.new, Mutex) @flush_thread = T.let(nil, T.nilable(Thread)) schedule_periodic_flush # Register shutdown hook return if @config.disable_at_exit_hook Kernel.at_exit { shutdown } end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
17 18 19 |
# File 'lib/langfuse/client.rb', line 17 def config @config end |
#events ⇒ Object (readonly)
Returns the value of attribute events.
21 22 23 |
# File 'lib/langfuse/client.rb', line 21 def events @events end |
#flush_thread ⇒ Object (readonly)
Returns the value of attribute flush_thread.
24 25 26 |
# File 'lib/langfuse/client.rb', line 24 def flush_thread @flush_thread end |
Instance Method Details
#event(attributes = {}) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/langfuse/client.rb', line 117 def event(attributes = {}) raise ArgumentError, 'trace_id is required for creating an event' unless attributes[:trace_id] event_obj = T.unsafe(Models::Event).new(attributes) event = T.unsafe(Models::IngestionEvent).new( type: 'event-create', body: event_obj ) enqueue_event(event) event_obj end |
#flush ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/langfuse/client.rb', line 145 def flush events_to_process = T.let([], T::Array[T.untyped]) # Atomically swap the events array to avoid race conditions @mutex.synchronize do events_to_process = @events.dup @events.clear end return if events_to_process.empty? # Convert objects to hashes for serialization # Assuming `to_h` exists on Models::IngestionEvent and returns T::Hash[T.untyped, T.untyped] event_hashes = events_to_process.map(&:to_h) log("Flushing #{event_hashes.size} events") # Send to background worker T.unsafe(BatchWorker).perform_async(event_hashes) end |
#generation(attributes = {}) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/langfuse/client.rb', line 88 def generation(attributes = {}) raise ArgumentError, 'trace_id is required for creating a generation' unless attributes[:trace_id] generation = T.unsafe(Models::Generation).new(attributes) event = T.unsafe(Models::IngestionEvent).new( type: 'generation-create', body: generation ) enqueue_event(event) generation end |
#score(attributes = {}) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/langfuse/client.rb', line 131 def score(attributes = {}) raise ArgumentError, 'trace_id is required for creating a score' unless attributes[:trace_id] score = T.unsafe(Models::Score).new(attributes) event = T.unsafe(Models::IngestionEvent).new( type: 'score-create', body: score ) enqueue_event(event) score end |
#shutdown ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/langfuse/client.rb', line 168 def shutdown log('Shutting down Langfuse client...') # Cancel the flush timer if it's running @flush_thread&.exit # Flush any remaining events flush log('Langfuse client shut down.') end |
#span(attributes = {}) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/langfuse/client.rb', line 57 def span(attributes = {}) raise ArgumentError, 'trace_id is required for creating a span' unless attributes[:trace_id] span = T.unsafe(Models::Span).new(attributes) event = T.unsafe(Models::IngestionEvent).new( type: 'span-create', body: span ) enqueue_event(event) span end |
#trace(attributes = {}) ⇒ Object
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/langfuse/client.rb', line 44 def trace(attributes = {}) # Ideally Models::Trace.new would have its own signature trace = T.unsafe(Models::Trace).new(attributes) event = T.unsafe(Models::IngestionEvent).new( type: 'trace-create', body: trace ) enqueue_event(event) trace end |
#update_generation(generation) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/langfuse/client.rb', line 102 def update_generation(generation) unless T.unsafe(generation).id && T.unsafe(generation).trace_id raise ArgumentError, 'generation.id and generation.trace_id are required for updating a generation' end event = T.unsafe(Models::IngestionEvent).new( type: 'generation-update', body: generation ) enqueue_event(event) generation end |
#update_span(span) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/langfuse/client.rb', line 71 def update_span(span) # Assuming span object has :id and :trace_id methods/attributes unless T.unsafe(span).id && T.unsafe(span).trace_id raise ArgumentError, 'span.id and span.trace_id are required for updating a span' end event = T.unsafe(Models::IngestionEvent).new( type: 'span-update', body: span ) enqueue_event(event) span end |