Module: Schemas

Included in:
Streamdal::Client, Streamdal::TestObj
Defined in:
lib/schema.rb

Instance Method Summary collapse

Instance Method Details

#_get_schema(aud) ⇒ Object



12
13
14
15
16
# File 'lib/schema.rb', line 12

def _get_schema(aud)
  return @schemas[aud_to_str(aud)].json_schema if @schemas.key?(aud_to_str(aud))

  ''
end

#_handle_schema(aud, step, wasm_resp) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/schema.rb', line 18

def _handle_schema(aud, step, wasm_resp)
  # Only handle schema steps
  return nil if step.infer_schema.nil?

  # Only successful schema inferences
  return nil if wasm_resp.exit_code != :WASM_EXIT_CODE_TRUE

  # If existing schema matches, do nothing
  existing_schema = _get_schema(aud)
  return nil if existing_schema == wasm_resp.output_step

  _set_schema(aud, wasm_resp.output_step)

  req = Streamdal::Protos::SendSchemaRequest.new
  req.audience = aud
  req.schema = Streamdal::Protos::Schema.new
  req.schema.json_schema = wasm_resp.output_step

  # Run in thread so we don't block on gRPC call
  Thread.new do
    @stub.send_schema(req, metadata: )
  end
end

#_set_schema(aud, schema) ⇒ Object



6
7
8
9
10
# File 'lib/schema.rb', line 6

def _set_schema(aud, schema)
  s = Streamdal::Protos::Schema.new
  s.json_schema = schema
  @schemas[aud_to_str(aud)] = s
end