Class: SchemaRegistry::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/schema_registry/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(endpoint, username = nil, password = nil) ⇒ Client



31
32
33
34
# File 'lib/schema_registry/client.rb', line 31

def initialize(endpoint, username = nil, password = nil)
  @endpoint = URI(endpoint)
  @username, @password = username, password
end

Instance Attribute Details

#endpointObject (readonly)

Returns the value of attribute endpoint.



29
30
31
# File 'lib/schema_registry/client.rb', line 29

def endpoint
  @endpoint
end

#passwordObject (readonly)

Returns the value of attribute password.



29
30
31
# File 'lib/schema_registry/client.rb', line 29

def password
  @password
end

#usernameObject (readonly)

Returns the value of attribute username.



29
30
31
# File 'lib/schema_registry/client.rb', line 29

def username
  @username
end

Instance Method Details

#default_compatibility_levelObject



49
50
51
# File 'lib/schema_registry/client.rb', line 49

def default_compatibility_level
  request(:get, "/config")["compatibilityLevel"]
end

#default_compatibility_level=(level) ⇒ Object



53
54
55
# File 'lib/schema_registry/client.rb', line 53

def default_compatibility_level=(level)
  request(:put, "/config", compatibility: level)
end

#request(method, path, body = nil) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/schema_registry/client.rb', line 57

def request(method, path, body = nil)
  Net::HTTP.start(endpoint.host, endpoint.port, use_ssl: endpoint.scheme == 'https') do |http|
    request_class = case method
      when :get;    Net::HTTP::Get
      when :post;   Net::HTTP::Post
      when :put;    Net::HTTP::Put
      when :delete; Net::HTTP::Delete
      else raise ArgumentError, "Unsupported request method"
    end

    request = request_class.new(path)
    request.basic_auth(username, password) if username && password
    request['Accept'] = "application/vnd.schemaregistry.v1+json"
    if body
      request['Content-Type'] = "application/json"
      request.body = JSON.dump(body)
    end

    case response = http.request(request)
    when Net::HTTPSuccess
      begin
        JSON.parse(response.body)
      rescue JSON::ParserError => e
        raise SchemaRegistry::InvalidResponse, "Invalid JSON in response: #{e.message}"
      end

    when Net::HTTPForbidden
      message = username.nil? ? "Unauthorized" : "User `#{username}` failed to authenticate"
      raise SchemaRegistry::UnauthorizedRequest.new(response.code.to_i, message)

    else
      response = begin
        JSON.parse(response.body)
      rescue JSON::ParserError => e
        raise SchemaRegistry::InvalidResponse, "Invalid JSON in response: #{e.message}"
      end

      error_class = RESPONSE_ERROR_CODES[response_data['error_code']] || SchemaRegistry::ResponseError
      raise error_class.new(response_data['error_code'], response_data['message'])
    end
  end
end

#schema(id) ⇒ Object



36
37
38
# File 'lib/schema_registry/client.rb', line 36

def schema(id)
  request(:get, "/schemas/ids/#{id}")['schema']
end

#subject(name) ⇒ Object



45
46
47
# File 'lib/schema_registry/client.rb', line 45

def subject(name)
  SchemaRegistry::Subject.new(self, name)
end

#subjectsObject



40
41
42
43
# File 'lib/schema_registry/client.rb', line 40

def subjects
  data = request(:get, "/subjects")
  data.map { |subject| SchemaRegistry::Subject.new(self, subject) }
end