Class: DTK::State::Component::Attribute::Influxdb

Inherits:
DTK::State::Component::Attribute show all
Defined in:
lib/state/component/providers/influxdb.rb,
lib/state/component/providers/influxdb/client.rb,
lib/state/component/providers/influxdb/measurement.rb,
lib/state/component/providers/influxdb/semantictype.rb,
lib/state/component/providers/influxdb/measurement/errors.rb,
lib/state/component/providers/influxdb/measurement/events.rb,
lib/state/component/providers/influxdb/measurement/states.rb,
lib/state/component/providers/influxdb/measurement/attribute_measurement.rb

Defined Under Namespace

Classes: Client, Measurement, SemanticType

Instance Attribute Summary collapse

Attributes inherited from DTK::State::Component::Attribute

#dynamic, #encrypted, #function, #name, #parent, #required, #type, #value

Instance Method Summary collapse

Methods inherited from DTK::State::Component::Attribute

create_from_kube_array, create_from_kube_hash, get, #to_hash

Constructor Details

#initialize(measurement_name) ⇒ Influxdb

Returns a new instance of Influxdb.



10
11
12
13
# File 'lib/state/component/providers/influxdb.rb', line 10

def initialize(measurement_name)
  @client = Influxdb::Client.new
  @measurement = @client.measurement_helper(measurement_name)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



8
9
10
# File 'lib/state/component/providers/influxdb.rb', line 8

def client
  @client
end

#measurementObject (readonly)

Returns the value of attribute measurement.



8
9
10
# File 'lib/state/component/providers/influxdb.rb', line 8

def measurement
  @measurement
end

Instance Method Details

#get(namespace, component_name, assembly_name, attribute_name, opts = {}) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/state/component/providers/influxdb.rb', line 15

def get(namespace, component_name, assembly_name, attribute_name, opts = {})
  required_tags = get_required_tags(namespace, component_name, assembly_name, attribute_name)
  if opts[:provider] == "correlation"
    errors = client.measurement_helper(:errors)
    required_tags.merge!({ correlator_type: opts[:entrypoint].split("/").last.split(".")[0] })
    errors.get_last_point(required_tags)
  elsif
    last_value = measurement.get_last_point(required_tags)
    last_value
  end
end

#get_event(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/state/component/providers/influxdb.rb', line 57

def get_event(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id)
  required_tags = {
      event_id: event_id,
      pod_name: pod_name,
      pod_namespace: pod_namespace,
      component_name: component_name,
      attribute_name: attribute_name,
      task_id: task_id
    }
  last_point = measurement.get_last_point(required_tags)
end

#write(namespace, component_name, assembly_name, attribute_name, value, opts = {}, timestamp = nil) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/state/component/providers/influxdb.rb', line 27

def write(namespace, component_name, assembly_name, attribute_name, value, opts = {}, timestamp = nil)
  if opts[:provider] == "correlation"
    errors = client.measurement_helper(:errors)
    required_tags = get_required_tags(namespace, component_name, assembly_name, attribute_name)
    required_tags.merge!({ correlator_type: opts[:entrypoint].split("/").last.split(".")[0] })
    errors.write(value.to_s, required_tags, timestamp)
  elsif
    required_tags = get_required_tags(namespace, component_name, assembly_name, attribute_name)
    measurement.write(value, required_tags, timestamp)
  end
end

#write_event(event_id, pod_name, pod_namespace, event_source, event_message, component_name, attribute_name, task_id, timestamp) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/state/component/providers/influxdb.rb', line 39

def write_event(event_id, pod_name, pod_namespace, event_source, event_message, component_name, attribute_name, task_id, timestamp)
  begin
    fail "Bad timestamp input, write operation wont be completed" if timestamp > Time.new
    value_to_write = { event_source: event_source, event_message: event_message }
    required_tags = {
      event_id: event_id,
      pod_name: pod_name,
      pod_namespace: pod_namespace,
      component_name: component_name,
      attribute_name: attribute_name,
      task_id: task_id
    }
    measurement.write(value_to_write.to_s, required_tags, timestamp)
  rescue => error
    fail error
  end
end

#write_state(type, name, namespace, object_state, spec, status, component_name, attribute_name, task_id, timestamp) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/state/component/providers/influxdb.rb', line 69

def write_state(type, name, namespace, object_state, spec, status, component_name, attribute_name, task_id, timestamp)
  begin
    fail "Bad timestamp input, write operation wont be completed" if timestamp > Time.new
    value_to_write = { spec: spec, status: status }
    required_tags = {
      type: type,
      name: name,
      namespace: namespace,
      object_state: object_state,
      component_name: component_name,
      attribute_name: attribute_name,
      task_id: task_id
    }
    measurement.write(value_to_write.to_s, required_tags, timestamp)
  rescue => error
    fail error
  end
end