Class: Fluent::KubernetesMetadataInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_kubernetes_metadata.rb

Constant Summary collapse

K8_POD_CA_CERT =
'ca.crt'
K8_POD_TOKEN =
'token'

Instance Method Summary collapse

Constructor Details

#initializeKubernetesMetadataInput

Returns a new instance of KubernetesMetadataInput.



55
56
57
58
59
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 55

def initialize
  super
  require 'kubeclient'
  require 'active_support/core_ext/object/blank'
end

Instance Method Details

#configure(conf) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 61

def configure(conf)
  super

  # Use Kubernetes default service account if we're in a pod.
  if @kubernetes_url.nil?
    env_host = ENV['KUBERNETES_SERVICE_HOST']
    env_port = ENV['KUBERNETES_SERVICE_PORT']
    if env_host.present? && env_port.present?
      @kubernetes_url = "https://#{env_host}:#{env_port}/api"
    end
  end
  unless @kubernetes_url
    raise Fluent::ConfigError, "kubernetes_url is not defined"
  end

  # Use SSL certificate and bearer token from Kubernetes service account.
  if Dir.exist?(@secret_dir)
    ca_cert = File.join(@secret_dir, K8_POD_CA_CERT)
    pod_token = File.join(@secret_dir, K8_POD_TOKEN)

    if !@ca_file.present? and File.exist?(ca_cert)
      @ca_file = ca_cert
    end

    if !@bearer_token_file.present? and File.exist?(pod_token)
      @bearer_token_file = pod_token
    end
  end
end

#emit_event(event_obj, time, type) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 150

def emit_event(event_obj, time, type)
  payload = syms_to_strs(event_obj)
  payload['event_type'] = type
  res_name = @resource.to_s.downcase
  namespace_name = event_obj['metadata']['namespace'] ? event_obj['metadata']['namespace'] : "openshift-infra"
  if event_obj['metadata']['labels'] then
    labels = []
    syms_to_strs(event_obj['metadata']['labels'].to_h).each{|k,v| labels << "#{k}=#{v}"}
    payload['metadata']['labels'] = labels
  end
  if event_obj['metadata']['annotations'] then
    annotations = []
    syms_to_strs(event_obj['metadata']['annotations'].to_h).each{|k,v| annotations << "#{k}=#{v}"}
    payload['metadata']['annotations'] = annotations
  end

  tag = "kubernetes.#{res_name}.#{namespace_name}.#{event_obj['metadata']['name']}"

  router.emit(tag, time, payload)
end

#shutdownObject



131
132
133
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 131

def shutdown
  @thread.exit
end

#startObject



91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 91

def start

  start_kubclient

  @thread = Thread.new(&method(:watch_resource))
  @thread.abort_on_exception = true

end

#start_kubclientObject



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 100

def start_kubclient
  return @client if @client

  if @kubernetes_url.present?

    ssl_options = {
        client_cert: @client_cert.present? ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil,
        client_key:  @client_key.present? ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil,
        ca_file:     @ca_file,
        verify_ssl:  @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE
    }

    auth_options = {}

    if @bearer_token_file.present?
      bearer_token = File.read(@bearer_token_file)
      auth_options[:bearer_token] = bearer_token
    end

    @client = Kubeclient::Client.new @kubernetes_url, @apiVersion,
                                     ssl_options: ssl_options,
                                     auth_options: auth_options

    begin
      @client.api_valid?
    rescue KubeException => kube_error
      raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{kube_error.message}"
    end
  end
end

#syms_to_strs(hsh) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 40

def syms_to_strs(hsh)
  newhsh = {}
  hsh.each_pair do |kk,vv|
    if vv.is_a?(Hash)
      vv = syms_to_strs(vv)
    end
    if kk.is_a?(Symbol)
      newhsh[kk.to_s] = vv
    else
      newhsh[kk] = vv
    end
  end
  newhsh
end

#watch_resourceObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 135

def watch_resource
  begin
    resource_version = @client.get_pods.resourceVersion
    watcher          = @client.watch_entities(@resource, options = {resource_version: resource_version})
  rescue Exception => e
    raise Fluent::ConfigError, "Exception encountered fetching metadata from Kubernetes API endpoint: #{e.message}"
  end


  watcher.each do |notice|
    time = Engine.now
    emit_event(notice.object, time, notice.type)
  end
end