Class: Langfuse::Client

Inherits:
Object
  • Object
show all
Extended by:
T::Sig
Includes:
Singleton
Defined in:
lib/langfuse/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/langfuse/client.rb', line 27

def initialize
  @config = T.let(Langfuse.configuration, ::Langfuse::Configuration)
  # Let Sorbet infer the type for Concurrent::Array here
  @events = T.let(Concurrent::Array.new, Concurrent::Array)
  @mutex = T.let(Mutex.new, Mutex)
  @flush_thread = T.let(nil, T.nilable(Thread))

  schedule_periodic_flush

  # Register shutdown hook
  return if @config.disable_at_exit_hook

  Kernel.at_exit { shutdown }
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



17
18
19
# File 'lib/langfuse/client.rb', line 17

def config
  @config
end

#eventsObject (readonly)

Returns the value of attribute events.



21
22
23
# File 'lib/langfuse/client.rb', line 21

def events
  @events
end

#flush_threadObject (readonly)

Returns the value of attribute flush_thread.



24
25
26
# File 'lib/langfuse/client.rb', line 24

def flush_thread
  @flush_thread
end

Instance Method Details

#event(attributes = {}) ⇒ Object

Raises:

  • (ArgumentError)


117
118
119
120
121
122
123
124
125
126
127
# File 'lib/langfuse/client.rb', line 117

def event(attributes = {})
  raise ArgumentError, 'trace_id is required for creating an event' unless attributes[:trace_id]

  event_obj = T.unsafe(Models::Event).new(attributes)
  event = T.unsafe(Models::IngestionEvent).new(
    type: 'event-create',
    body: event_obj
  )
  enqueue_event(event)
  event_obj
end

#flushObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/langfuse/client.rb', line 145

def flush
  events_to_process = T.let([], T::Array[T.untyped])

  # Atomically swap the events array to avoid race conditions
  @mutex.synchronize do
    events_to_process = @events.dup
    @events.clear
  end

  return if events_to_process.empty?

  # Convert objects to hashes for serialization
  # Assuming `to_h` exists on Models::IngestionEvent and returns T::Hash[T.untyped, T.untyped]
  event_hashes = events_to_process.map(&:to_h)

  log("Flushing #{event_hashes.size} events")

  # Send to background worker
  T.unsafe(BatchWorker).perform_async(event_hashes)
end

#generation(attributes = {}) ⇒ Object

Raises:

  • (ArgumentError)


88
89
90
91
92
93
94
95
96
97
98
# File 'lib/langfuse/client.rb', line 88

def generation(attributes = {})
  raise ArgumentError, 'trace_id is required for creating a generation' unless attributes[:trace_id]

  generation = T.unsafe(Models::Generation).new(attributes)
  event = T.unsafe(Models::IngestionEvent).new(
    type: 'generation-create',
    body: generation
  )
  enqueue_event(event)
  generation
end

#score(attributes = {}) ⇒ Object

Raises:

  • (ArgumentError)


131
132
133
134
135
136
137
138
139
140
141
# File 'lib/langfuse/client.rb', line 131

def score(attributes = {})
  raise ArgumentError, 'trace_id is required for creating a score' unless attributes[:trace_id]

  score = T.unsafe(Models::Score).new(attributes)
  event = T.unsafe(Models::IngestionEvent).new(
    type: 'score-create',
    body: score
  )
  enqueue_event(event)
  score
end

#shutdownObject



168
169
170
171
172
173
174
175
176
177
178
# File 'lib/langfuse/client.rb', line 168

def shutdown
  log('Shutting down Langfuse client...')

  # Cancel the flush timer if it's running
  @flush_thread&.exit

  # Flush any remaining events
  flush

  log('Langfuse client shut down.')
end

#span(attributes = {}) ⇒ Object

Raises:

  • (ArgumentError)


57
58
59
60
61
62
63
64
65
66
67
# File 'lib/langfuse/client.rb', line 57

def span(attributes = {})
  raise ArgumentError, 'trace_id is required for creating a span' unless attributes[:trace_id]

  span = T.unsafe(Models::Span).new(attributes)
  event = T.unsafe(Models::IngestionEvent).new(
    type: 'span-create',
    body: span
  )
  enqueue_event(event)
  span
end

#trace(attributes = {}) ⇒ Object



44
45
46
47
48
49
50
51
52
53
# File 'lib/langfuse/client.rb', line 44

def trace(attributes = {})
  # Ideally Models::Trace.new would have its own signature
  trace = T.unsafe(Models::Trace).new(attributes)
  event = T.unsafe(Models::IngestionEvent).new(
    type: 'trace-create',
    body: trace
  )
  enqueue_event(event)
  trace
end

#update_generation(generation) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/langfuse/client.rb', line 102

def update_generation(generation)
  unless T.unsafe(generation).id && T.unsafe(generation).trace_id
    raise ArgumentError, 'generation.id and generation.trace_id are required for updating a generation'
  end

  event = T.unsafe(Models::IngestionEvent).new(
    type: 'generation-update',
    body: generation
  )
  enqueue_event(event)
  generation
end

#update_span(span) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/langfuse/client.rb', line 71

def update_span(span)
  # Assuming span object has :id and :trace_id methods/attributes
  unless T.unsafe(span).id && T.unsafe(span).trace_id
    raise ArgumentError,
          'span.id and span.trace_id are required for updating a span'
  end

  event = T.unsafe(Models::IngestionEvent).new(
    type: 'span-update',
    body: span
  )
  enqueue_event(event)
  span
end