Class: TurbineRb::Record

Inherits:
Object
  • Object
show all
Defined in:
lib/turbine_rb/records.rb

Overview

rubocop:disable Metrics/ClassLength

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pb_record) ⇒ Record

Returns a new instance of Record.



11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/turbine_rb/records.rb', line 11

def initialize(pb_record)
  @key = pb_record.key
  @timestamp = pb_record.timestamp

  begin
    @value = JSON.parse(pb_record.value)
  rescue JSON::ParserError
    @value = pb_record.value
  end

  @value = @value.to_dot if value_hash?
end

Instance Attribute Details

#keyObject

Returns the value of attribute key.



9
10
11
# File 'lib/turbine_rb/records.rb', line 9

def key
  @key
end

#timestampObject

Returns the value of attribute timestamp.



9
10
11
# File 'lib/turbine_rb/records.rb', line 9

def timestamp
  @timestamp
end

#valueObject

Returns the value of attribute value.



9
10
11
# File 'lib/turbine_rb/records.rb', line 9

def value
  @value
end

Instance Method Details

#get(key) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/turbine_rb/records.rb', line 32

def get(key)
  if value_string? || value_array?
    @value
  else
    @value.send(payload_key(key))
  end
end

#serializeObject



24
25
26
# File 'lib/turbine_rb/records.rb', line 24

def serialize
  Io::Meroxa::Funtime::Record.new(key: @key, value: serialize_value, timestamp: @timestamp)
end

#serialize_core_recordObject



28
29
30
# File 'lib/turbine_rb/records.rb', line 28

def serialize_core_record
  TurbineCore::Record.new(key: @key, value: serialize_value, timestamp: @timestamp)
end

#set(key, value) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/turbine_rb/records.rb', line 40

def set(key, value)
  @value = value unless value_hash?
  return @value unless value_hash?

  if json_schema?
    begin
      @value.send(payload_key(key))
    rescue NoMethodError
      set_schema_field(key, value)
    end
  end

  @value.send("#{payload_key(key)}=", value)
end

#unwrap!Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/turbine_rb/records.rb', line 55

def unwrap!
  return unless cdc_format?

  payload = @value.send("payload")
  schema = @value.send("schema.fields")
  schema_fields = schema.find { |f| f.field == "after" }
  unless schema_fields.nil?
    schema_fields.delete("field")
    schema_fields.name = @value.send("schema.name")
    @value.send("schema=", schema_fields)
  end

  @value.send("payload=", payload.after)
end