Class: Langfuse::Client

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/langfuse/client.rb

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/langfuse/client.rb', line 9

def initialize
  @config = Langfuse.configuration
  @events = Concurrent::Array.new # Thread-safe array
  @mutex = Mutex.new # For operations that need additional thread safety

  # Start periodic flusher only in server context
  schedule_periodic_flush if defined?(Rails) && Rails.server?

  # Register shutdown hook
  return if @config.disable_at_exit_hook

  at_exit { shutdown }
end

Instance Method Details

#event(attributes = {}) ⇒ Object

Creates a new event within a trace

Raises:

  • (ArgumentError)


87
88
89
90
91
92
93
94
95
96
97
# File 'lib/langfuse/client.rb', line 87

def event(attributes = {})
  raise ArgumentError, 'trace_id is required for creating an event' unless attributes[:trace_id]

  event_obj = Models::Event.new(attributes)
  event = Models::IngestionEvent.new(
    type: 'event-create',
    body: event_obj
  )
  enqueue_event(event)
  event_obj
end

#flushObject

Flushes all pending events to the API



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/langfuse/client.rb', line 113

def flush
  events_to_process = nil

  # Atomically swap the events array to avoid race conditions
  @mutex.synchronize do
    events_to_process = @events.dup
    @events.clear
  end

  return if events_to_process.empty?

  # Convert objects to hashes for serialization
  event_hashes = events_to_process.map(&:to_h)

  log("Flushing #{event_hashes.size} events")

  # Send to background worker
  BatchWorker.perform_async(event_hashes)
end

#generation(attributes = {}) ⇒ Object

Creates a new generation within a trace

Raises:

  • (ArgumentError)


60
61
62
63
64
65
66
67
68
69
70
# File 'lib/langfuse/client.rb', line 60

def generation(attributes = {})
  raise ArgumentError, 'trace_id is required for creating a generation' unless attributes[:trace_id]

  generation = Models::Generation.new(attributes)
  event = Models::IngestionEvent.new(
    type: 'generation-create',
    body: generation
  )
  enqueue_event(event)
  generation
end

#score(attributes = {}) ⇒ Object

Creates a new score

Raises:

  • (ArgumentError)


100
101
102
103
104
105
106
107
108
109
110
# File 'lib/langfuse/client.rb', line 100

def score(attributes = {})
  raise ArgumentError, 'trace_id is required for creating a score' unless attributes[:trace_id]

  score = Models::Score.new(attributes)
  event = Models::IngestionEvent.new(
    type: 'score-create',
    body: score
  )
  enqueue_event(event)
  score
end

#shutdownObject

Gracefully shuts down the client, ensuring all events are flushed



134
135
136
137
138
139
140
141
142
143
144
# File 'lib/langfuse/client.rb', line 134

def shutdown
  log('Shutting down Langfuse client...')

  # Cancel the flush timer if it's running
  @flush_thread&.exit

  # Flush any remaining events
  flush

  log('Langfuse client shut down.')
end

#span(attributes = {}) ⇒ Object

Creates a new span within a trace

Raises:

  • (ArgumentError)


35
36
37
38
39
40
41
42
43
44
45
# File 'lib/langfuse/client.rb', line 35

def span(attributes = {})
  raise ArgumentError, 'trace_id is required for creating a span' unless attributes[:trace_id]

  span = Models::Span.new(attributes)
  event = Models::IngestionEvent.new(
    type: 'span-create',
    body: span
  )
  enqueue_event(event)
  span
end

#trace(attributes = {}) ⇒ Object

Creates a new trace



24
25
26
27
28
29
30
31
32
# File 'lib/langfuse/client.rb', line 24

def trace(attributes = {})
  trace = Models::Trace.new(attributes)
  event = Models::IngestionEvent.new(
    type: 'trace-create',
    body: trace
  )
  enqueue_event(event)
  trace
end

#update_generation(generation) ⇒ Object

Updates an existing generation



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/langfuse/client.rb', line 73

def update_generation(generation)
  unless generation.id && generation.trace_id
    raise ArgumentError, 'generation.id and generation.trace_id are required for updating a generation'
  end

  event = Models::IngestionEvent.new(
    type: 'generation-update',
    body: generation
  )
  enqueue_event(event)
  generation
end

#update_span(span) ⇒ Object

Updates an existing span

Raises:

  • (ArgumentError)


48
49
50
51
52
53
54
55
56
57
# File 'lib/langfuse/client.rb', line 48

def update_span(span)
  raise ArgumentError, 'span.id and span.trace_id are required for updating a span' unless span.id && span.trace_id

  event = Models::IngestionEvent.new(
    type: 'span-update',
    body: span
  )
  enqueue_event(event)
  span
end