Class: Fal::Request

Inherits:
Object
  • Object
show all
Defined in:
lib/fal/request.rb

Overview

Represents a queued request submitted to a fal model endpoint. Provides helpers to create, query status, cancel, and fetch response payloads using the Queue API described in the fal docs. See: docs.fal.ai/model-apis/model-endpoints/queue

Defined Under Namespace

Modules: Status

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attributes, endpoint_id:, client: Fal.client) ⇒ Request



35
36
37
38
39
# File 'lib/fal/request.rb', line 35

def initialize(attributes, endpoint_id:, client: Fal.client)
  @client = client
  @endpoint_id = endpoint_id
  reset_attributes(attributes)
end

Instance Attribute Details

#endpoint_idString (readonly)



30
31
32
# File 'lib/fal/request.rb', line 30

def endpoint_id
  @endpoint_id
end

#idString (readonly)



20
21
22
# File 'lib/fal/request.rb', line 20

def id
  @id
end

#logsArray<Hash>? (readonly)



26
27
28
# File 'lib/fal/request.rb', line 26

def logs
  @logs
end

#queue_positionInteger? (readonly)



24
25
26
# File 'lib/fal/request.rb', line 24

def queue_position
  @queue_position
end

#responseHash? (readonly)



28
29
30
# File 'lib/fal/request.rb', line 28

def response
  @response
end

#statusString (readonly)



22
23
24
# File 'lib/fal/request.rb', line 22

def status
  @status
end

Class Method Details

.create!(endpoint_id:, input:, webhook_url: nil, client: Fal.client) ⇒ Fal::Request

Create a new queued request for a model. Corresponds to POST queue.fal.run/#endpoint_id Optionally appends fal_webhook query param per docs.



50
51
52
53
54
55
56
# File 'lib/fal/request.rb', line 50

def create!(endpoint_id:, input:, webhook_url: nil, client: Fal.client)
  path = "/#{endpoint_id}"
  body = input || {}
  path = "#{path}?fal_webhook=#{CGI.escape(webhook_url)}" if webhook_url
  attributes = client.post(path, body)
  new(attributes, endpoint_id: endpoint_id, client: client)
end

.find_by!(id:, endpoint_id:, logs: false, client: Fal.client) ⇒ Fal::Request

Find the current status for a given request. Corresponds to GET queue.fal.run/#endpoint_id/requests/request_id/status



65
66
67
68
69
70
# File 'lib/fal/request.rb', line 65

def find_by!(id:, endpoint_id:, logs: false, client: Fal.client)
  endpoint_id_without_subpath = endpoint_id.split("/").slice(0, 2).join("/")
  attributes = client.get("/#{endpoint_id_without_subpath}/requests/#{id}/status",
                          query: (logs ? { logs: 1 } : nil))
  new(attributes, endpoint_id: endpoint_id, client: client)
end

.stream!(endpoint_id:, input:, client: Fal.client) {|chunk| ... } ⇒ Fal::Request

Stream a synchronous request using SSE and yield response chunks as they arrive. It returns a Fal::Request initialized with the last streamed data in the response field.

Yields:

  • (chunk)

    yields each parsed chunk Hash from the stream

Yield Parameters:

  • chunk (Hash)


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fal/request.rb', line 80

def stream!(endpoint_id:, input:, client: Fal.client, &block)
  path = "/#{endpoint_id}/stream"
  last_data = nil

  Stream.new(path: path, input: input, client: client).each do |event|
    data = event["data"]
    last_data = data
    block&.call(data)
  end

  # Wrap last chunk into a Request-like object for convenience
  # Build attributes from last event, using inner response if available
  response_payload = if last_data&.key?("response")
                       last_data["response"]
                     else
                       last_data
                     end
  attributes = {
    "request_id" => last_data && last_data["request_id"],
    "status" => last_data && last_data["status"],
    "response" => response_payload
  }.compact
  new(attributes, endpoint_id: endpoint_id, client: client)
end

Instance Method Details

#cancel!Hash

Attempt to cancel the request if still in queue.



128
129
130
# File 'lib/fal/request.rb', line 128

def cancel!
  @client.put("/#{endpoint_id_without_subpath}/requests/#{@id}/cancel")
end

#completed?Boolean



143
144
145
# File 'lib/fal/request.rb', line 143

def completed?
  @status == Status::COMPLETED
end

#endpoint_id_without_subpathString



107
108
109
# File 'lib/fal/request.rb', line 107

def endpoint_id_without_subpath
  @endpoint_id.split("/").slice(0, 2).join("/")
end

#in_progress?Boolean



138
139
140
# File 'lib/fal/request.rb', line 138

def in_progress?
  @status == Status::IN_PROGRESS
end

#in_queue?Boolean



133
134
135
# File 'lib/fal/request.rb', line 133

def in_queue?
  @status == Status::IN_QUEUE
end

#reload!(logs: false) ⇒ Fal::Request

Reload the current status from the Queue API.



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fal/request.rb', line 114

def reload!(logs: false)
  if @status == Status::IN_PROGRESS || @status == Status::IN_QUEUE
    attributes = @client.get("/#{endpoint_id_without_subpath}/requests/#{@id}/status",
                             query: (logs ? { logs: 1 } : nil))
    reset_attributes(attributes)
  end

  @response = @client.get("/#{endpoint_id_without_subpath}/requests/#{@id}") if @status == Status::COMPLETED

  self
end