Module: Outhad::Integrations::Core::Utils

Included in:
BaseConnector, QueryBuilder, Protocol::ProtocolModel
Defined in:
lib/outhad/integrations/core/utils.rb

Instance Method Summary collapse

Instance Method Details

#build_catalog(catalog_json) ⇒ Object



95
96
97
98
99
100
101
102
103
104
# File 'lib/outhad/integrations/core/utils.rb', line 95

def build_catalog(catalog_json)
  streams = catalog_json["streams"].map { |stream_json| build_stream(stream_json) }
  Outhad::Integrations::Protocol::Catalog.new(
    streams: streams,
    request_rate_limit: catalog_json["request_rate_limit"] || 60,
    request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute",
    request_rate_concurrency: catalog_json["request_rate_concurrency"] || 10,
    schema_mode: catalog_json["schema_mode"] || "schema"
  )
end

#build_stream(stream_json) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/outhad/integrations/core/utils.rb', line 106

def build_stream(stream_json)
  Outhad::Integrations::Protocol::Stream.new(
    name: stream_json["name"],
    url: stream_json["url"],
    action: stream_json["action"],
    request_method: stream_json["method"],
    batch_support: stream_json["batch_support"] || false,
    batch_size: stream_json["batch_size"] || 1,
    json_schema: stream_json["json_schema"],
    request_rate_limit: stream_json["request_rate_limit"].to_i,
    request_rate_limit_unit: stream_json["request_rate_limit_unit"] || "minute",
    request_rate_concurrency: stream_json["request_rate_concurrency"].to_i,
    supported_sync_modes: stream_json["supported_sync_modes"]
  )
end

#convert_to_json_schema(column_definitions) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/outhad/integrations/core/utils.rb', line 18

def convert_to_json_schema(column_definitions)
  json_schema = {
    "type" => "object",
    "properties" => {}
  }

  column_definitions.each do |column|
    column_name = column[:column_name]
    type = column[:type]
    optional = column[:optional]
    json_type = map_type_to_json_schema(type)
    json_schema["properties"][column_name] = {
      "type" => json_type
    }
    json_schema["properties"][column_name]["type"] = [json_type, "null"] if optional
  end

  json_schema
end

#create_log_message(context, type, message) ⇒ Object



66
67
68
69
70
71
72
# File 'lib/outhad/integrations/core/utils.rb', line 66

def create_log_message(context, type, message)
  Integrations::Protocol::LogMessage.new(
    name: context,
    level: type,
    message: message
  ).to_outhad_message
end

#extract_data(record_object, properties) ⇒ Object



86
87
88
89
# File 'lib/outhad/integrations/core/utils.rb', line 86

def extract_data(record_object, properties)
  data_attributes = record_object.with_indifferent_access
  data_attributes.select { |key, _| properties.key?(key.to_sym) }
end

#handle_exception(exception, meta = {}) ⇒ Object



74
75
76
77
78
79
80
# File 'lib/outhad/integrations/core/utils.rb', line 74

def handle_exception(exception, meta = {})
  logger.error(
    "#{hash_to_string(meta)}: #{exception.message}"
  )
  report_exception(exception, meta)
  create_log_message(meta[:context], meta[:type], exception.message)
end

#hash_to_string(hash) ⇒ Object



82
83
84
# File 'lib/outhad/integrations/core/utils.rb', line 82

def hash_to_string(hash)
  hash.map { |key, value| "#{key} = #{value}" }.join(", ")
end

#keys_to_symbols(hash) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
# File 'lib/outhad/integrations/core/utils.rb', line 6

def keys_to_symbols(hash)
  if hash.is_a?(Hash)
    hash.each_with_object({}) do |(key, value), result|
      result[key.to_sym] = keys_to_symbols(value)
    end
  elsif hash.is_a?(Array)
    hash.map { |item| keys_to_symbols(item) }
  else
    hash
  end
end

#log_request_response(level, request, response) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/outhad/integrations/core/utils.rb', line 58

def log_request_response(level, request, response)
  Integrations::Protocol::LogMessage.new(
    name: self.class.name,
    level: level,
    message: { request: request.to_s, response: response.to_s, level: level }.to_json
  )
end

#loggerObject



49
50
51
# File 'lib/outhad/integrations/core/utils.rb', line 49

def logger
  Integrations::Service.logger
end

#map_type_to_json_schema(type) ⇒ Object



38
39
40
41
42
43
44
45
46
47
# File 'lib/outhad/integrations/core/utils.rb', line 38

def map_type_to_json_schema(type)
  case type
  when "NUMBER"
    "integer"
  when "vector"
    "vector"
  else
    "string" # Default type
  end
end

#report_exception(exception, meta = {}) ⇒ Object



53
54
55
56
# File 'lib/outhad/integrations/core/utils.rb', line 53

def report_exception(exception, meta = {})
  reporter = Integrations::Service.exception_reporter
  reporter&.report(exception, meta)
end

#success?(response) ⇒ Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/outhad/integrations/core/utils.rb', line 91

def success?(response)
  response && %w[200 201].include?(response.code.to_s)
end