Class: Prefab::ConfigClient
- Inherits:
-
Object
- Object
- Prefab::ConfigClient
show all
- Includes:
- ConfigHelper
- Defined in:
- lib/prefab/config_client.rb
Constant Summary
collapse
- RECONNECT_WAIT =
5
- DEFAULT_CHECKPOINT_FREQ_SEC =
60
- SSE_READ_TIMEOUT =
300
- AUTH_USER =
"authuser"
Class Method Summary
collapse
Instance Method Summary
collapse
#value_of, #value_of_variant
Constructor Details
#initialize(base_client, timeout) ⇒ ConfigClient
Returns a new instance of ConfigClient.
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/prefab/config_client.rb', line 11
def initialize(base_client, timeout)
@base_client = base_client
@options = base_client.options
@base_client.log_internal Logger::DEBUG, "Initialize ConfigClient"
@timeout = timeout
@stream_lock = Concurrent::ReadWriteLock.new
@checkpoint_freq_secs = DEFAULT_CHECKPOINT_FREQ_SEC
@config_loader = Prefab::ConfigLoader.new(@base_client)
@config_resolver = Prefab::ConfigResolver.new(@base_client, @config_loader)
@initialization_lock = Concurrent::ReadWriteLock.new
@base_client.log_internal Logger::DEBUG, "Initialize ConfigClient: AcquireWriteLock"
@initialization_lock.acquire_write_lock
@base_client.log_internal Logger::DEBUG, "Initialize ConfigClient: AcquiredWriteLock"
@initialized_future = Concurrent::Future.execute { @initialization_lock.acquire_read_lock }
@cancellable_interceptor = Prefab::CancellableInterceptor.new(@base_client)
if @options.local_only?
finish_init!(:local_only)
else
load_checkpoint
start_checkpointing_thread
start_streaming
end
end
|
Class Method Details
.value_to_delta(key, config_value, namespace = nil) ⇒ Object
70
71
72
73
|
# File 'lib/prefab/config_client.rb', line 70
def self.value_to_delta(key, config_value, namespace = nil)
Prefab::Config.new(key: [namespace, key].compact.join(":"),
rows: [Prefab::ConfigRow.new(value: config_value)])
end
|
Instance Method Details
#get(key, default = Prefab::Client::NO_DEFAULT_PROVIDED) ⇒ Object
75
76
77
78
|
# File 'lib/prefab/config_client.rb', line 75
def get(key, default=Prefab::Client::NO_DEFAULT_PROVIDED)
config = _get(key)
config ? value_of(config[:value]) : handle_default(key, default)
end
|
#get_config_obj(key) ⇒ Object
80
81
82
83
|
# File 'lib/prefab/config_client.rb', line 80
def get_config_obj(key)
config = _get(key)
config ? config[:config] : nil
end
|
#reset ⇒ Object
61
62
63
64
|
# File 'lib/prefab/config_client.rb', line 61
def reset
@base_client.reset!
@_stub = nil
end
|
#start_streaming ⇒ Object
41
42
43
44
45
|
# File 'lib/prefab/config_client.rb', line 41
def start_streaming
@stream_lock.with_write_lock do
start_sse_streaming_connection_thread(@config_loader.highwater_mark) if @streaming_thread.nil?
end
end
|
#to_s ⇒ Object
66
67
68
|
# File 'lib/prefab/config_client.rb', line 66
def to_s
@config_resolver.to_s
end
|
#upsert(key, config_value, namespace = nil, previous_key = nil) ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/prefab/config_client.rb', line 47
def upsert(key, config_value, namespace = nil, previous_key = nil)
raise "Key must not contain ':' set namespaces separately" if key.include? ":"
raise "Namespace must not contain ':'" if namespace&.include?(":")
config_delta = Prefab::ConfigClient.value_to_delta(key, config_value, namespace)
upsert_req = Prefab::UpsertRequest.new(config_delta: config_delta)
upsert_req.previous_key = previous_key if previous_key&.present?
@base_client.request Prefab::ConfigService, :upsert, req_options: { timeout: @timeout }, params: upsert_req
@base_client.stats.increment("prefab.config.upsert")
@config_loader.set(config_delta, :upsert)
@config_loader.rm(previous_key) if previous_key&.present?
@config_resolver.update
end
|