Class: Fluent::KubernetesMetadataInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_kubernetes_metadata.rb

Constant Summary collapse

K8_POD_CA_CERT =
'ca.crt'
K8_POD_TOKEN =
'token'

Instance Method Summary collapse

Constructor Details

#initializeKubernetesMetadataInput

Returns a new instance of KubernetesMetadataInput.



62
63
64
65
66
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 62

def initialize
  super
  require 'kubeclient'
  require 'active_support/core_ext/object/blank'
end

Instance Method Details

#configure(conf) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 68

def configure(conf)
  super

  # Use Kubernetes default service account if we're in a pod.
  if @kubernetes_url.nil?
    env_host = ENV['KUBERNETES_SERVICE_HOST']
    env_port = ENV['KUBERNETES_SERVICE_PORT']
    if env_host.present? && env_port.present?
      @kubernetes_url = "https://#{env_host}:#{env_port}/api"
    end
  end
  unless @kubernetes_url
    raise Fluent::ConfigError, "kubernetes_url is not defined"
  end

  # Use SSL certificate and bearer token from Kubernetes service account.
  if Dir.exist?(@secret_dir)
    ca_cert = File.join(@secret_dir, K8_POD_CA_CERT)
    pod_token = File.join(@secret_dir, K8_POD_TOKEN)

    if !@ca_file.present? and File.exist?(ca_cert)
      @ca_file = ca_cert
    end

    if !@bearer_token_file.present? and File.exist?(pod_token)
      @bearer_token_file = pod_token
    end
  end
end

#emit_event(event_obj, time, type) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 164

def emit_event(event_obj, time, type)
  payload = syms_to_strs(event_obj)
  payload['event_type'] = type
  res_name = @resource.to_s.downcase
  namespace_name = event_obj['metadata']['namespace'] ? event_obj['metadata']['namespace'] : "openshift-infra"
  if event_obj['metadata']['labels'] then
    labels = []
    syms_to_strs(event_obj['metadata']['labels'].to_h).each{|k,v| labels << "#{k}=#{v}"}
    payload['metadata']['labels'] = labels
  end
  if event_obj['metadata']['annotations'] then
    annotations = []
    syms_to_strs(event_obj['metadata']['annotations'].to_h).each{|k,v| annotations << "#{k}=#{v}"}
    payload['metadata']['annotations'] = annotations
  end

  tag = "kubernetes.#{res_name}.#{namespace_name}.#{event_obj['metadata']['name']}"

  router.emit(tag, time, payload)
end

#shutdownObject



138
139
140
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 138

def shutdown
  @thread.exit
end

#startObject



98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 98

def start

  start_kubclient

  @thread = Thread.new(&method(:watch_resource))
  @thread.abort_on_exception = true

end

#start_kubclientObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 107

def start_kubclient
  return @client if @client

  if @kubernetes_url.present?

    ssl_options = {
        client_cert: @client_cert.present? ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil,
        client_key:  @client_key.present? ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil,
        ca_file:     @ca_file,
        verify_ssl:  @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE
    }

    auth_options = {}

    if @bearer_token_file.present?
      bearer_token = File.read(@bearer_token_file)
      auth_options[:bearer_token] = bearer_token
    end

    @client = Kubeclient::Client.new @kubernetes_url, @apiVersion,
                                     ssl_options: ssl_options,
                                     auth_options: auth_options

    begin
      @client.api_valid?
    rescue KubeException => kube_error
      raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{kube_error.message}"
    end
  end
end

#syms_to_strs(hsh) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 47

def syms_to_strs(hsh)
  newhsh = {}
  hsh.each_pair do |kk,vv|
    if vv.is_a?(Hash)
      vv = syms_to_strs(vv)
    end
    if kk.is_a?(Symbol)
      newhsh[kk.to_s] = vv
    else
      newhsh[kk] = vv
    end
  end
  newhsh
end

#watch_resourceObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/fluent/plugin/in_kubernetes_metadata.rb', line 142

def watch_resource
  begin
    resource_version = @client.get_pods.resourceVersion
    watcher          = @client.watch_entities(@resource, options = {resource_version: resource_version})
  rescue Exception => e
    raise Fluent::ConfigError, "Exception encountered fetching metadata from Kubernetes API endpoint: #{e.message}"
  end


  begin
    watcher.each do |notice|
      time = Engine.now
      emit_event(notice.object, time, notice.type)
    end
    log.trace "Exited resource watcher"
  rescue
    log.error "Unexpected error in resource watcher", :error=>$!.to_s
    log.error_backtrace
  end

end