Class: Anthropic::Helpers::Streaming::MessageStream Private

Inherits:
Object
  • Object
show all
Includes:
Internal::Type::BaseStream
Defined in:
lib/anthropic/helpers/streaming/message_stream.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

MessageStream provides a Ruby Enumerable interface over Server-Sent Events from the Anthropic API, yielding a mix of raw streaming events and higher-level typed events while maintaining accumulated message state throughout the stream lifecycle.

Generic:

  • Elem

Instance Attribute Summary

Attributes included from Internal::Type::BaseStream

#headers, #status

Instance Method Summary collapse

Methods included from Internal::Type::BaseStream

#close, defer_closing, #each, #inspect, #to_enum

Constructor Details

#initialize(raw_stream:, tool_models: {}) ⇒ MessageStream

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of MessageStream.

Parameters:



265
266
267
268
269
270
271
272
273
274
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 265

def initialize(raw_stream:, tool_models: {})
  # The underlying Server-Sent Event stream from the Anthropic API.
  @raw_stream = raw_stream
  # Accumulated message state that builds up as events are processed.
  @accumated_message_snapshot = nil
  # Mapping of tool names to model classes for parsing.
  @tool_models = tool_models
  # Lazy enumerable that transforms raw events into consumable events.
  @iterator = iterator
end

Instance Method Details

#accumulated_messageAnthropic::Models::Message

Returns the complete accumulated Message object after stream completion.



64
65
66
67
68
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 64

def accumulated_message
  until_done
  parse_tool_uses!(@accumated_message_snapshot) if @tool_models.any?
  @accumated_message_snapshot
end

#accumulated_textString

Returns all text content blocks concatenated into a single string. NOTE: Currently the API will only respond with a single content block.

Will raise an error if no text content blocks were returned.

Returns:

  • (String)


77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 77

def accumulated_text
  message = accumulated_message
  text_blocks = []
  message.content.each do |block|
    if block.type == :text
      text_blocks << block.text
    end
  end

  if text_blocks.empty?
    raise RuntimeError.new("Expected to have received at least 1 text block")
  end

  text_blocks.join
end

#textEnumerable<String>

Returns an enumerable of text deltas from the streaming response.

Returns:

  • (Enumerable<String>)


49
50
51
52
53
54
55
56
57
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 49

def text
  Anthropic::Internal::Util.chain_fused(@iterator) do |y|
    @iterator.each do |event|
      if event.type == :content_block_delta && event.delta.type == :text_delta
        y << event.delta.text
      end
    end
  end
end

#until_donevoid

This method returns an undefined value.

Blocks until the stream has been consumed



42
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 42

def until_done = each {} # rubocop:disable Lint/EmptyBlock