Class: Kubeclient::Common::WatchStream

Inherits:
Object
  • Object
show all
Defined in:
lib/kubeclient/watch_stream.rb

Overview

HTTP Stream used to watch changes on entities

Instance Method Summary collapse

Constructor Details

#initialize(uri, http_options, format: :json) ⇒ WatchStream

Returns a new instance of WatchStream.



7
8
9
10
11
12
# File 'lib/kubeclient/watch_stream.rb', line 7

def initialize(uri, http_options, format: :json)
  @uri = uri
  @http = nil
  @http_options = http_options.merge(read_timeout: nil)
  @format = format
end

Instance Method Details

#eachObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/kubeclient/watch_stream.rb', line 14

def each
  @finished = false
  @http = Net::HTTP.start(@uri.host, @uri.port, @http_options)

  buffer = ''
  request = generate_request

  @http.request(request) do |response|
    unless response.is_a? Net::HTTPSuccess
      fail KubeException.new(response.code, response.message, response)
    end
    response.read_body do |chunk|
      buffer << chunk
      while (line = buffer.slice!(/.+\n/))
        yield @format == :json ? WatchNotice.new(JSON.parse(line)) : line.chomp
      end
    end
  end
rescue Errno::EBADF
  raise unless @finished
end

#finishObject



49
50
51
52
# File 'lib/kubeclient/watch_stream.rb', line 49

def finish
  @finished = true
  @http.finish if !@http.nil? && @http.started?
end

#generate_requestObject



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/kubeclient/watch_stream.rb', line 36

def generate_request
  request = Net::HTTP::Get.new(@uri)
  if @http_options[:basic_auth_user] && @http_options[:basic_auth_password]
    request.basic_auth @http_options[:basic_auth_user],
                       @http_options[:basic_auth_password]
  end

  @http_options.fetch(:headers, {}).each do |header, value|
    request[header.to_s] = value
  end
  request
end