Class: DSPy::Gemini::LM::Adapters::GeminiAdapter

Inherits:
LM::Adapter
  • Object
show all
Defined in:
lib/dspy/gemini/lm/adapters/gemini_adapter.rb

Instance Method Summary collapse

Constructor Details

#initialize(model:, api_key:, structured_outputs: false) ⇒ GeminiAdapter

Returns a new instance of GeminiAdapter.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/dspy/gemini/lm/adapters/gemini_adapter.rb', line 16

def initialize(model:, api_key:, structured_outputs: false)
  super(model: model, api_key: api_key)
  validate_api_key!(api_key, 'gemini')

  @structured_outputs_enabled = structured_outputs

  # Disable streaming for VCR tests since SSE responses don't record properly
  # But keep streaming enabled for SSEVCR tests (SSE-specific cassettes)
  @use_streaming = true
  begin
    vcr_active = defined?(VCR) && VCR.current_cassette
    ssevcr_active = defined?(SSEVCR) && SSEVCR.turned_on?

    # Only disable streaming if regular VCR is active but SSEVCR is not
    @use_streaming = false if vcr_active && !ssevcr_active
  rescue
    # If VCR/SSEVCR is not available or any error occurs, use streaming
    @use_streaming = true
  end

  @client = ::Gemini.new(
    credentials: {
      service: 'generative-language-api',
      api_key: api_key,
      version: 'v1beta'  # Use beta API version for structured outputs support
    },
    options: { 
      model: model,
      server_sent_events: @use_streaming
    }
  )
end

Instance Method Details

#chat(messages:, signature: nil, **extra_params, &block) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/dspy/gemini/lm/adapters/gemini_adapter.rb', line 49

def chat(messages:, signature: nil, **extra_params, &block)
  normalized_messages = normalize_messages(messages)

  # Validate vision support if images are present
  if contains_images?(normalized_messages)
    DSPy::LM::VisionModels.validate_vision_support!('gemini', model)
    # Convert messages to Gemini format with proper image handling
    normalized_messages = format_multimodal_messages(normalized_messages)
  end

  # Convert DSPy message format to Gemini format
  gemini_messages = convert_messages_to_gemini_format(normalized_messages)

  request_params = {
    contents: gemini_messages
  }.merge(extra_params)

  begin
    content = ""
    final_response_data = nil

    # Check if we're using streaming or not
    if @use_streaming
      # Streaming mode
      @client.stream_generate_content(request_params) do |chunk|
        # Handle case where chunk might be a string (from SSE VCR)
        if chunk.is_a?(String)
          begin
            chunk = JSON.parse(chunk)
          rescue JSON::ParserError => e
            raise DSPy::LM::AdapterError, "Failed to parse Gemini streaming response: #{e.message}"
          end
        end

        # Extract content from chunks
        if chunk.dig('candidates', 0, 'content', 'parts')
          chunk_text = extract_text_from_parts(chunk.dig('candidates', 0, 'content', 'parts'))
          content += chunk_text

          # Call block only if provided (for real streaming)
          block.call(chunk) if block_given?
        end

        # Store final response data (usage, metadata) from last chunk
        if chunk['usageMetadata'] || chunk.dig('candidates', 0, 'finishReason')
          final_response_data = chunk
        end
      end
    else
      # Non-streaming mode (for VCR tests)
      response = @client.generate_content(request_params)

      # Extract content from single response
      if response.dig('candidates', 0, 'content', 'parts')
        content = extract_text_from_parts(response.dig('candidates', 0, 'content', 'parts'))
      end

      # Use response as final data
      final_response_data = response
    end

    # Extract usage information from final chunk
    usage_data = final_response_data&.dig('usageMetadata')
    usage_struct = usage_data ? DSPy::LM::UsageFactory.create('gemini', usage_data) : nil

    # Create metadata from final chunk
     = {
      provider: 'gemini',
      model: model,
      finish_reason: final_response_data&.dig('candidates', 0, 'finishReason'),
      safety_ratings: final_response_data&.dig('candidates', 0, 'safetyRatings'),
      streaming: block_given?
    }

    # Create typed metadata
     = DSPy::LM::ResponseMetadataFactory.create('gemini', )

    DSPy::LM::Response.new(
      content: content,
      usage: usage_struct,
      metadata: 
    )
  rescue => e
    handle_gemini_error(e)
  end
end