Class: Prefab::SSEConfigClient
- Inherits:
-
Object
- Object
- Prefab::SSEConfigClient
- Defined in:
- lib/prefab/sse_config_client.rb
Constant Summary collapse
- SSE_READ_TIMEOUT =
300- SECONDS_BETWEEN_RECONNECT =
5- AUTH_USER =
'authuser'- LOG =
Prefab::InternalLogger.new(self)
Instance Method Summary collapse
- #close ⇒ Object
- #connect(&load_configs) ⇒ Object
- #headers ⇒ Object
-
#initialize(options, config_loader) ⇒ SSEConfigClient
constructor
A new instance of SSEConfigClient.
- #source ⇒ Object
- #start(&load_configs) ⇒ Object
Constructor Details
#initialize(options, config_loader) ⇒ SSEConfigClient
Returns a new instance of SSEConfigClient.
10 11 12 13 14 |
# File 'lib/prefab/sse_config_client.rb', line 10 def initialize(, config_loader) = @config_loader = config_loader @connected = false end |
Instance Method Details
#close ⇒ Object
16 17 18 19 |
# File 'lib/prefab/sse_config_client.rb', line 16 def close @retry_thread&.kill @client&.close end |
#connect(&load_configs) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/prefab/sse_config_client.rb', line 47 def connect(&load_configs) url = "#{source}/api/v1/sse/config" LOG.debug "SSE Streaming Connect to #{url} start_at #{@config_loader.highwater_mark}" SSE::Client.new(url, headers: headers, read_timeout: SSE_READ_TIMEOUT, logger: Prefab::InternalLogger.new(SSE::Client)) do |client| client.on_event do |event| configs = PrefabProto::Configs.decode(Base64.decode64(event.data)) load_configs.call(configs, :sse) end client.on_error do |error| LOG.error "SSE Streaming Error: #{error} for url #{url}" if error.is_a?(HTTP::ConnectionError) client.close end end end end |
#headers ⇒ Object
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/prefab/sse_config_client.rb', line 70 def headers auth = "#{AUTH_USER}:#{@options.api_key}" auth_string = Base64.strict_encode64(auth) return { 'x-prefab-start-at-id' => @config_loader.highwater_mark, 'Authorization' => "Basic #{auth_string}", 'Accept' => 'text/event-stream', 'X-PrefabCloud-Client-Version' => "prefab-cloud-ruby-#{Prefab::VERSION}" } end |
#source ⇒ Object
81 82 83 84 85 86 87 88 89 |
# File 'lib/prefab/sse_config_client.rb', line 81 def source @source_index = @source_index.nil? ? 0 : @source_index + 1 if @source_index >= .sse_sources.size @source_index = 0 end return .sse_sources[@source_index] end |
#start(&load_configs) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/prefab/sse_config_client.rb', line 21 def start(&load_configs) if .sse_sources.empty? LOG.debug 'No SSE sources configured' return end @client = connect(&load_configs) closed_count = 0 @retry_thread = Thread.new do loop do sleep 1 if @client.closed? closed_count += 1 if closed_count > SECONDS_BETWEEN_RECONNECT closed_count = 0 connect(&load_configs) end end end end end |