Class: Prefab::ConfigClient

Inherits:
Object
  • Object
show all
Includes:
ConfigHelper
Defined in:
lib/prefab/config_client.rb

Constant Summary collapse

RECONNECT_WAIT =
5
DEFAULT_CHECKPOINT_FREQ_SEC =
60
SSE_READ_TIMEOUT =
300
AUTH_USER =
"authuser"

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ConfigHelper

#value_of, #value_of_variant

Constructor Details

#initialize(base_client, timeout) ⇒ ConfigClient

Returns a new instance of ConfigClient.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/prefab/config_client.rb', line 11

def initialize(base_client, timeout)
  @base_client = base_client
  @options = base_client.options
  @base_client.log_internal Logger::DEBUG, "Initialize ConfigClient"
  @timeout = timeout

  @stream_lock = Concurrent::ReadWriteLock.new

  @checkpoint_freq_secs = DEFAULT_CHECKPOINT_FREQ_SEC

  @config_loader = Prefab::ConfigLoader.new(@base_client)
  @config_resolver = Prefab::ConfigResolver.new(@base_client, @config_loader)

  @initialization_lock = Concurrent::ReadWriteLock.new
  @base_client.log_internal Logger::DEBUG, "Initialize ConfigClient: AcquireWriteLock"
  @initialization_lock.acquire_write_lock
  @base_client.log_internal Logger::DEBUG, "Initialize ConfigClient: AcquiredWriteLock"
  @initialized_future = Concurrent::Future.execute { @initialization_lock.acquire_read_lock }

  @cancellable_interceptor = Prefab::CancellableInterceptor.new(@base_client)

  if @options.local_only?
    finish_init!(:local_only)
  else
    load_checkpoint
    start_checkpointing_thread
    start_streaming
  end
end

Class Method Details

.value_to_delta(key, config_value, namespace = nil) ⇒ Object



70
71
72
73
# File 'lib/prefab/config_client.rb', line 70

def self.value_to_delta(key, config_value, namespace = nil)
  Prefab::Config.new(key: [namespace, key].compact.join(":"),
                     rows: [Prefab::ConfigRow.new(value: config_value)])
end

Instance Method Details

#get(key, default = Prefab::Client::NO_DEFAULT_PROVIDED) ⇒ Object



75
76
77
78
# File 'lib/prefab/config_client.rb', line 75

def get(key, default=Prefab::Client::NO_DEFAULT_PROVIDED)
  config = _get(key)
  config ? value_of(config[:value]) : handle_default(key, default)
end

#get_config_obj(key) ⇒ Object



80
81
82
83
# File 'lib/prefab/config_client.rb', line 80

def get_config_obj(key)
  config = _get(key)
  config ? config[:config] : nil
end

#resetObject



61
62
63
64
# File 'lib/prefab/config_client.rb', line 61

def reset
  @base_client.reset!
  @_stub = nil
end

#start_streamingObject



41
42
43
44
45
# File 'lib/prefab/config_client.rb', line 41

def start_streaming
  @stream_lock.with_write_lock do
    start_sse_streaming_connection_thread(@config_loader.highwater_mark) if @streaming_thread.nil?
  end
end

#to_sObject



66
67
68
# File 'lib/prefab/config_client.rb', line 66

def to_s
  @config_resolver.to_s
end

#upsert(key, config_value, namespace = nil, previous_key = nil) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/prefab/config_client.rb', line 47

def upsert(key, config_value, namespace = nil, previous_key = nil)
  raise "Key must not contain ':' set namespaces separately" if key.include? ":"
  raise "Namespace must not contain ':'" if namespace&.include?(":")
  config_delta = Prefab::ConfigClient.value_to_delta(key, config_value, namespace)
  upsert_req = Prefab::UpsertRequest.new(config_delta: config_delta)
  upsert_req.previous_key = previous_key if previous_key&.present?

  @base_client.request Prefab::ConfigService, :upsert, req_options: { timeout: @timeout }, params: upsert_req
  @base_client.stats.increment("prefab.config.upsert")
  @config_loader.set(config_delta, :upsert)
  @config_loader.rm(previous_key) if previous_key&.present?
  @config_resolver.update
end