Class: Prefab::ConfigClient

Inherits:
Object
  • Object
show all
Defined in:
lib/prefab/config_client.rb

Constant Summary collapse

RECONNECT_WAIT =
5
DEFAULT_CHECKPOINT_FREQ_SEC =
60
SSE_READ_TIMEOUT =
300
AUTH_USER =
'authuser'

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(base_client, timeout) ⇒ ConfigClient

Returns a new instance of ConfigClient.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/prefab/config_client.rb', line 10

def initialize(base_client, timeout)
  @base_client = base_client
  @options = base_client.options
  @base_client.log_internal ::Logger::DEBUG, 'Initialize ConfigClient'
  @timeout = timeout

  @stream_lock = Concurrent::ReadWriteLock.new

  @checkpoint_freq_secs = DEFAULT_CHECKPOINT_FREQ_SEC

  @config_loader = Prefab::ConfigLoader.new(@base_client)
  @config_resolver = Prefab::ConfigResolver.new(@base_client, @config_loader)

  @initialization_lock = Concurrent::ReadWriteLock.new
  @base_client.log_internal ::Logger::DEBUG, 'Initialize ConfigClient: AcquireWriteLock'
  @initialization_lock.acquire_write_lock
  @base_client.log_internal ::Logger::DEBUG, 'Initialize ConfigClient: AcquiredWriteLock'
  @initialized_future = Concurrent::Future.execute { @initialization_lock.acquire_read_lock }

  @cancellable_interceptor = Prefab::CancellableInterceptor.new(@base_client)

  if @options.local_only?
    finish_init!(:local_only)
  else
    load_checkpoint
    start_checkpointing_thread
    start_streaming
  end
end

Class Method Details

.value_to_delta(key, config_value, namespace = nil) ⇒ Object



70
71
72
73
# File 'lib/prefab/config_client.rb', line 70

def self.value_to_delta(key, config_value, namespace = nil)
  Prefab::Config.new(key: [namespace, key].compact.join(':'),
                     rows: [Prefab::ConfigRow.new(value: config_value)])
end

Instance Method Details

#get(key, default = Prefab::Client::NO_DEFAULT_PROVIDED, properties = {}, lookup_key = nil) ⇒ Object



75
76
77
78
# File 'lib/prefab/config_client.rb', line 75

def get(key, default = Prefab::Client::NO_DEFAULT_PROVIDED, properties = {}, lookup_key = nil)
  value = _get(key, lookup_key, properties)
  value ? Prefab::ConfigValueUnwrapper.unwrap(value, key, properties) : handle_default(key, default)
end

#resetObject



61
62
63
64
# File 'lib/prefab/config_client.rb', line 61

def reset
  @base_client.reset!
  @_stub = nil
end

#start_streamingObject



40
41
42
43
44
# File 'lib/prefab/config_client.rb', line 40

def start_streaming
  @stream_lock.with_write_lock do
    start_sse_streaming_connection_thread(@config_loader.highwater_mark) if @streaming_thread.nil?
  end
end

#to_sObject



66
67
68
# File 'lib/prefab/config_client.rb', line 66

def to_s
  @config_resolver.to_s
end

#upsert(key, config_value, namespace = nil, previous_key = nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/prefab/config_client.rb', line 46

def upsert(key, config_value, namespace = nil, previous_key = nil)
  raise "Key must not contain ':' set namespaces separately" if key.include? ':'
  raise "Namespace must not contain ':'" if namespace&.include?(':')

  config_delta = Prefab::ConfigClient.value_to_delta(key, config_value, namespace)
  upsert_req = Prefab::UpsertRequest.new(config_delta: config_delta)
  upsert_req.previous_key = previous_key if previous_key&.present?

  @base_client.request Prefab::ConfigService, :upsert, req_options: { timeout: @timeout }, params: upsert_req
  @base_client.stats.increment('prefab.config.upsert')
  @config_loader.set(config_delta, :upsert)
  @config_loader.rm(previous_key) if previous_key&.present?
  @config_resolver.update
end