Class: Geminize::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/geminize/client.rb

Overview

Client for making HTTP requests to the Gemini API

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Initialize a new client

Parameters:

  • options (Hash) (defaults to: {})

    Additional options to override the defaults

Options Hash (options):

  • :api_key (String)

    API key for Gemini API

  • :api_version (String)

    API version to use

  • :timeout (Integer)

    Request timeout in seconds

  • :open_timeout (Integer)

    Connection open timeout in seconds

  • :streaming_timeout (Integer)

    Timeout for streaming requests in seconds

  • :logger (Logger)

    Custom logger instance (default: nil)



28
29
30
31
32
33
34
# File 'lib/geminize/client.rb', line 28

def initialize(options = {})
  @config = Geminize.configuration
  @options = options
  @connection = build_connection
  @streaming_in_progress = false
  @cancel_streaming = false
end

Instance Attribute Details

#cancel_streamingBoolean

Returns Flag indicating if a streaming operation should be cancelled.

Returns:

  • (Boolean)

    Flag indicating if a streaming operation should be cancelled



18
19
20
# File 'lib/geminize/client.rb', line 18

def cancel_streaming
  @cancel_streaming
end

#connectionFaraday::Connection (readonly)

Returns The Faraday connection.

Returns:

  • (Faraday::Connection)

    The Faraday connection



12
13
14
# File 'lib/geminize/client.rb', line 12

def connection
  @connection
end

#streaming_in_progressBoolean (readonly)

Returns Flag indicating if a streaming operation is in progress.

Returns:

  • (Boolean)

    Flag indicating if a streaming operation is in progress



15
16
17
# File 'lib/geminize/client.rb', line 15

def streaming_in_progress
  @streaming_in_progress
end

Instance Method Details

#get(endpoint, params = {}, headers = {}) ⇒ Hash

Make a GET request to the specified endpoint

Parameters:

  • endpoint (String)

    The API endpoint path

  • params (Hash) (defaults to: {})

    Optional query parameters

  • headers (Hash) (defaults to: {})

    Optional headers

Returns:

  • (Hash)

    The response body parsed as JSON



41
42
43
44
45
46
47
48
# File 'lib/geminize/client.rb', line 41

def get(endpoint, params = {}, headers = {})
  response = connection.get(
    build_url(endpoint),
    add_api_key(params),
    default_headers.merge(headers)
  )
  parse_response(response)
end

#post(endpoint, payload = {}, params = {}, headers = {}) ⇒ Hash

Make a POST request to the specified endpoint

Parameters:

  • endpoint (String)

    The API endpoint path

  • payload (Hash) (defaults to: {})

    The request body

  • params (Hash) (defaults to: {})

    Optional query parameters

  • headers (Hash) (defaults to: {})

    Optional headers

Returns:

  • (Hash)

    The response body parsed as JSON



56
57
58
59
60
61
62
63
64
65
# File 'lib/geminize/client.rb', line 56

def post(endpoint, payload = {}, params = {}, headers = {})
  response = connection.post(
    build_url(endpoint),
    payload.to_json,
    default_headers.merge(headers).merge({"Content-Type" => "application/json"})
  ) do |req|
    req.params.merge!(add_api_key(params))
  end
  parse_response(response)
end

#post_stream(endpoint, payload = {}, params = {}, headers = {}) {|chunk| ... }

This method returns an undefined value.

Make a streaming POST request to the specified endpoint

Parameters:

  • endpoint (String)

    The API endpoint path

  • payload (Hash) (defaults to: {})

    The request body

  • params (Hash) (defaults to: {})

    Optional query parameters

  • headers (Hash) (defaults to: {})

    Optional headers

Yields:

  • (chunk)

    Yields each chunk of the streaming response

Yield Parameters:

  • chunk (String, Hash)

    A chunk of the response (raw text or parsed JSON)

Raises:



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/geminize/client.rb', line 79

def post_stream(endpoint, payload = {}, params = {}, headers = {}, &block)
  raise ArgumentError, "A block is required for streaming requests" unless block_given?

  # Check if another streaming operation is in progress
  if @streaming_in_progress
    raise StreamingError.new("Another streaming operation is already in progress")
  end

  @streaming_in_progress = true
  @cancel_streaming = false

  # Ensure we have alt=sse parameter for the API to get server-sent events
  params = params.merge(alt: "sse")

  # Create a separate connection for streaming
  streaming_connection = build_streaming_connection

  # Initialize buffer for SSE processing
  @buffer = ""

  # Track if we've received any data
  received_data = false

  begin
    # Make the streaming request
    streaming_connection.post(
      build_url(endpoint),
      payload.to_json,
      default_headers.merge(headers).merge({
        "Content-Type" => "application/json",
        "Accept" => "text/event-stream" # Request SSE format explicitly
      })
    ) do |req|
      req.params.merge!(add_api_key(params))

      # Configure buffer management and chunked transfer reception
      req.options.on_data = proc do |chunk, size, env|
        # Check if cancellation is requested
        if @cancel_streaming
          env[:request].http_connection.close
          raise StreamingInterruptedError.new("Streaming was cancelled by the client")
        end

        received_data = true

        # Skip empty chunks
        next if chunk.strip.empty?

        # Use a buffer for handling partial SSE messages
        @buffer += chunk

        # Process complete SSE messages in buffer
        process_buffer(&block)
      end
    end
  rescue Faraday::ConnectionFailed => e
    # Connection was established but interrupted
    if received_data
      raise StreamingInterruptedError.new("Streaming connection interrupted: #{e.message}")
    else
      raise RequestError.new("Failed to establish streaming connection: #{e.message}", "CONNECTION_ERROR", nil)
    end
  rescue Faraday::TimeoutError => e
    raise StreamingTimeoutError.new("Streaming operation timed out: #{e.message}")
  rescue JSON::ParserError => e
    raise InvalidStreamFormatError.new("Could not parse streaming response: #{e.message}")
  rescue => e
    # Generic error handler
    error_message = "Streaming error: #{e.message}"
    raise StreamingError.new(error_message, nil, nil)
  ensure
    # Always clean up resources
    @buffer = nil
    @streaming_in_progress = false
    @cancel_streaming = false

    # Reset the connection to free resources
    begin
      streaming_connection&.close if streaming_connection&.respond_to?(:close)
    rescue => e
      # Just log the error if there's a problem closing the connection
      @options[:logger]&.warn("Error closing streaming connection: #{e.message}")
    end
  end
end