Class: Prefab::ConfigClient

Inherits:
Object
  • Object
show all
Includes:
ConfigHelper
Defined in:
lib/prefab/config_client.rb

Constant Summary collapse

RECONNECT_WAIT =
5
DEFAULT_CHECKPOINT_FREQ_SEC =
60
SSE_READ_TIMEOUT =
300

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ConfigHelper

#value_of, #value_of_variant

Constructor Details

#initialize(base_client, timeout) ⇒ ConfigClient

Returns a new instance of ConfigClient.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/prefab/config_client.rb', line 10

def initialize(base_client, timeout)
  @base_client = base_client
  @options = base_client.options
  @base_client.log_internal Logger::DEBUG, "Initialize ConfigClient"
  @timeout = timeout

  @stream_lock = Concurrent::ReadWriteLock.new

  @checkpoint_freq_secs = DEFAULT_CHECKPOINT_FREQ_SEC

  @config_loader = Prefab::ConfigLoader.new(@base_client)
  @config_resolver = Prefab::ConfigResolver.new(@base_client, @config_loader)

  @initialization_lock = Concurrent::ReadWriteLock.new
  @base_client.log_internal Logger::DEBUG, "Initialize ConfigClient: AcquireWriteLock"
  @initialization_lock.acquire_write_lock
  @base_client.log_internal Logger::DEBUG, "Initialize ConfigClient: AcquiredWriteLock"
  @initialized_future = Concurrent::Future.execute { @initialization_lock.acquire_read_lock }

  @cancellable_interceptor = Prefab::CancellableInterceptor.new(@base_client)

  if @options.local_only?
    finish_init!(:local_only)
  else
    load_checkpoint
    start_checkpointing_thread
    start_streaming
  end
end

Class Method Details

.value_to_delta(key, config_value, namespace = nil) ⇒ Object



69
70
71
72
# File 'lib/prefab/config_client.rb', line 69

def self.value_to_delta(key, config_value, namespace = nil)
  Prefab::Config.new(key: [namespace, key].compact.join(":"),
                     rows: [Prefab::ConfigRow.new(value: config_value)])
end

Instance Method Details

#get(key, default = Prefab::Client::NO_DEFAULT_PROVIDED) ⇒ Object



74
75
76
77
# File 'lib/prefab/config_client.rb', line 74

def get(key, default=Prefab::Client::NO_DEFAULT_PROVIDED)
  config = _get(key)
  config ? value_of(config[:value]) : handle_default(key, default)
end

#get_config_obj(key) ⇒ Object



79
80
81
82
# File 'lib/prefab/config_client.rb', line 79

def get_config_obj(key)
  config = _get(key)
  config ? config[:config] : nil
end

#resetObject



60
61
62
63
# File 'lib/prefab/config_client.rb', line 60

def reset
  @base_client.reset!
  @_stub = nil
end

#start_streamingObject



40
41
42
43
44
# File 'lib/prefab/config_client.rb', line 40

def start_streaming
  @stream_lock.with_write_lock do
    start_sse_streaming_connection_thread(@config_loader.highwater_mark) if @streaming_thread.nil?
  end
end

#to_sObject



65
66
67
# File 'lib/prefab/config_client.rb', line 65

def to_s
  @config_resolver.to_s
end

#upsert(key, config_value, namespace = nil, previous_key = nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/prefab/config_client.rb', line 46

def upsert(key, config_value, namespace = nil, previous_key = nil)
  raise "Key must not contain ':' set namespaces separately" if key.include? ":"
  raise "Namespace must not contain ':'" if namespace&.include?(":")
  config_delta = Prefab::ConfigClient.value_to_delta(key, config_value, namespace)
  upsert_req = Prefab::UpsertRequest.new(config_delta: config_delta)
  upsert_req.previous_key = previous_key if previous_key&.present?

  @base_client.request Prefab::ConfigService, :upsert, req_options: { timeout: @timeout }, params: upsert_req
  @base_client.stats.increment("prefab.config.upsert")
  @config_loader.set(config_delta, :upsert)
  @config_loader.rm(previous_key) if previous_key&.present?
  @config_resolver.update
end