Class: Anthropic::Helpers::Streaming::MessageStream Private

Inherits:
Object
  • Object
show all
Includes:
Internal::Type::BaseStream
Defined in:
lib/anthropic/helpers/streaming/message_stream.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

MessageStream provides a Ruby Enumerable interface over Server-Sent Events from the Anthropic API, yielding a mix of raw streaming events and higher-level typed events while maintaining accumulated message state throughout the stream lifecycle.

Instance Attribute Summary

Attributes included from Internal::Type::BaseStream

#headers, #status

Instance Method Summary collapse

Methods included from Internal::Type::BaseStream

#close, #each, #inspect, #to_enum

Constructor Details

#initialize(raw_stream:, tools: {}, models: {}) ⇒ MessageStream

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of MessageStream.



274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 274

def initialize(raw_stream:, tools: {}, models: {})
  # The underlying Server-Sent Event stream from the Anthropic API.
  @raw_stream = raw_stream
  # Accumulated message state that builds up as events are processed.
  @accumated_message_snapshot = nil
  # Mapping of tool names to model classes for parsing.
  @tools = tools
  @models = models
  # Lazy enumerable that transforms raw events into consumable events.
  @iterator = iterator
  @status = raw_stream.status
  @headers = raw_stream.headers
  @model = raw_stream.instance_variable_get(:@model)
end

Instance Method Details

#accumulated_messageAnthropic::Models::Message, Anthropic::Models::Beta::BetaMessage

Returns the complete accumulated Message object after stream completion.



64
65
66
67
68
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 64

def accumulated_message
  until_done
  parse_tool_uses!(@accumated_message_snapshot)
  @accumated_message_snapshot
end

#accumulated_textString

Returns all text content blocks concatenated into a single string. NOTE: Currently the API will only respond with a single content block.

Will raise an error if no ‘text` content blocks were returned.



77
78
79
80
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 77

def accumulated_text
  message = accumulated_message
  message.content.map { _1.type == :text ? _1.text : nil }.join
end

#textEnumerable<String>

Returns an enumerable of text deltas from the streaming response.



49
50
51
52
53
54
55
56
57
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 49

def text
  Anthropic::Internal::Util.chain_fused(@iterator) do |y|
    @iterator.each do |event|
      if event.type == :content_block_delta && event.delta.type == :text_delta
        y << event.delta.text
      end
    end
  end
end

#until_donevoid

This method returns an undefined value.

Blocks until the stream has been consumed



42
# File 'lib/anthropic/helpers/streaming/message_stream.rb', line 42

def until_done = each {} # rubocop:disable Lint/EmptyBlock