Class: OpenRouter::StreamingClient

Inherits:
Client
  • Object
show all
Defined in:
lib/open_router/streaming_client.rb

Overview

Enhanced streaming client with better event handling and response reconstruction

Instance Attribute Summary

Attributes inherited from Client

#callbacks, #usage_tracker

Instance Method Summary collapse

Methods inherited from Client

#clear_callbacks, #complete, #configuration, #models, #on, #query_generation_stats, #select_model, #smart_complete, #smart_complete_with_fallback, #trigger_callbacks

Methods included from HTTP

#delete, #get, #multipart_post, #post

Constructor Details

#initialize(*args, **kwargs, &block) ⇒ StreamingClient

Initialize streaming client with additional streaming-specific options



7
8
9
10
11
12
13
14
15
16
# File 'lib/open_router/streaming_client.rb', line 7

def initialize(*args, **kwargs, &block)
  super(*args, **kwargs, &block)
  @streaming_callbacks = {
    on_chunk: [],
    on_start: [],
    on_finish: [],
    on_tool_call_chunk: [],
    on_error: []
  }
end

Instance Method Details

#on_stream(event, &block) ⇒ self

Register streaming-specific callbacks

Parameters:

  • event (Symbol)

    The streaming event to register for

  • block (Proc)

    The callback to execute

Returns:

  • (self)

    Returns self for method chaining



23
24
25
26
27
28
29
30
31
# File 'lib/open_router/streaming_client.rb', line 23

def on_stream(event, &block)
  unless @streaming_callbacks.key?(event)
    valid_events = @streaming_callbacks.keys.join(", ")
    raise ArgumentError, "Invalid streaming event: #{event}. Valid events are: #{valid_events}"
  end

  @streaming_callbacks[event] << block
  self
end

#stream(messages, model: "openrouter/auto", **extras, &block) ⇒ Object

Stream with a simple block interface

Examples:

client.stream(messages, model: "openai/gpt-4o-mini") do |chunk|
  print chunk
end

Parameters:

  • messages (Array<Hash>)

    Array of message hashes

  • model (String|Array) (defaults to: "openrouter/auto")

    Model identifier

  • block (Proc)

    Block to call for each content chunk

  • extras (Hash)

    Additional parameters

Raises:

  • (ArgumentError)


79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/open_router/streaming_client.rb', line 79

def stream(messages, model: "openrouter/auto", **extras, &block)
  raise ArgumentError, "Block required for streaming" unless block_given?

  stream_complete(
    messages,
    model: model,
    accumulate_response: false,
    **extras
  ) do |chunk|
    content = extract_content_from_chunk(chunk)
    block.call(content) if content
  end
end

#stream_complete(messages, model: "openrouter/auto", accumulate_response: true, **extras) ⇒ Response?

Enhanced streaming completion with better event handling and response reconstruction

Parameters:

  • messages (Array<Hash>)

    Array of message hashes

  • model (String|Array) (defaults to: "openrouter/auto")

    Model identifier or array of models for fallback

  • accumulate_response (Boolean) (defaults to: true)

    Whether to accumulate and return complete response

  • extras (Hash)

    Additional parameters for the completion request

Returns:

  • (Response, nil)

    Complete response if accumulate_response is true, nil otherwise



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/open_router/streaming_client.rb', line 40

def stream_complete(messages, model: "openrouter/auto", accumulate_response: true, **extras)
  response_accumulator = ResponseAccumulator.new if accumulate_response

  # Set up streaming handler
  stream_handler = build_stream_handler(response_accumulator)

  # Trigger start callback
  trigger_streaming_callbacks(:on_start, { model: model, messages: messages })

  begin
    # Execute the streaming request
    complete(messages, model: model, stream: stream_handler, **extras)

    # Return accumulated response if requested
    if accumulate_response && response_accumulator
      final_response = response_accumulator.build_response
      trigger_streaming_callbacks(:on_finish, final_response)
      final_response
    else
      trigger_streaming_callbacks(:on_finish, nil)
      nil
    end
  rescue StandardError => e
    trigger_streaming_callbacks(:on_error, e)
    raise
  end
end