Class: Fal::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/fal/stream.rb

Overview

Streaming helper for Server-Sent Events from fal.run synchronous endpoints. It parses SSE lines and yields decoded event hashes with symbolized keys.

Defined Under Namespace

Classes: SSEDecoder

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path:, input:, client: Fal.client) ⇒ Stream

Returns a new instance of Stream.

Parameters:

  • path (String)

    full path under sync_base (leading slash), ex: “/fal-ai/flux/dev/stream”

  • input (Hash)

    request input payload

  • client (Fal::Client) (defaults to: Fal.client)

    HTTP client



13
14
15
16
17
# File 'lib/fal/stream.rb', line 13

def initialize(path:, input:, client: Fal.client)
  @path = path
  @input = input
  @client = client
end

Instance Attribute Details

#pathString (readonly)

Returns endpoint path under fal.run, e.g. “/fal-ai/flux/dev/stream”.

Returns:

  • (String)

    endpoint path under fal.run, e.g. “/fal-ai/flux/dev/stream”



8
9
10
# File 'lib/fal/stream.rb', line 8

def path
  @path
end

Instance Method Details

#each {|event| ... } ⇒ void

This method returns an undefined value.

Stream events; yields a Hash for each event data chunk. Blocks until stream ends.

Yields:

  • (event)

    yields decoded event hash

Yield Parameters:

  • event (Hash)


23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/fal/stream.rb', line 23

def each(&block)
  buffer = ""
  decoder = SSEDecoder.new

  @client.post_stream(@path, @input, on_data: proc do |chunk, _total_bytes|
    buffer = (buffer + chunk).gsub(/\r\n?/, "\n")
    lines = buffer.split("\n", -1)
    buffer = lines.pop || ""
    lines.each do |line|
      event = decoder.decode(line)
      block.call(event) if event
    end
  end)
end