Class: Prefab::ConfigClient

Inherits:
Object
  • Object
show all
Defined in:
lib/prefab/config_client.rb

Constant Summary collapse

RECONNECT_WAIT =
5
DEFAULT_CHECKPOINT_FREQ_SEC =
60

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(base_client, timeout) ⇒ ConfigClient

Returns a new instance of ConfigClient.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/prefab/config_client.rb', line 6

def initialize(base_client, timeout)
  @base_client = base_client
  @timeout = timeout
  @initialization_lock = Concurrent::ReadWriteLock.new

  @checkpoint_freq_secs = DEFAULT_CHECKPOINT_FREQ_SEC

  @config_loader = Prefab::ConfigLoader.new(@base_client)
  @config_resolver = Prefab::ConfigResolver.new(@base_client, @config_loader)

  @initialization_lock.acquire_write_lock
  @s3 = Aws::S3::Resource.new(region: 'us-east-1')

  load_checkpoint
  start_checkpointing_thread
end

Class Method Details

.value_to_delta(key, config_value, namespace = nil) ⇒ Object



56
57
58
59
# File 'lib/prefab/config_client.rb', line 56

def self.value_to_delta(key, config_value, namespace = nil)
  Prefab::ConfigDelta.new(key: [namespace, key].compact.join(":"),
                          value: config_value)
end

Instance Method Details

#get(prop) ⇒ Object



27
28
29
30
31
# File 'lib/prefab/config_client.rb', line 27

def get(prop)
  @initialization_lock.with_read_lock do
    @config_resolver.get(prop)
  end
end

#resetObject



47
48
49
50
# File 'lib/prefab/config_client.rb', line 47

def reset
  @base_client.reset!
  @_stub = nil
end

#start_streamingObject



23
24
25
# File 'lib/prefab/config_client.rb', line 23

def start_streaming
  start_api_connection_thread(@config_loader.highwater_mark)
end

#to_sObject



52
53
54
# File 'lib/prefab/config_client.rb', line 52

def to_s
  @config_resolver.to_s
end

#upsert(key, config_value, namespace = nil, previous_key = nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/prefab/config_client.rb', line 33

def upsert(key, config_value, namespace = nil, previous_key = nil)
  raise "Key must not contain ':' set namespaces separately" if key.include? ":"
  raise "Namespace must not contain ':'" if namespace&.include?(":")
  config_delta = Prefab::ConfigClient.value_to_delta(key, config_value, namespace)
  upsert_req = Prefab::UpsertRequest.new(config_delta: config_delta)
  upsert_req.previous_key = previous_key if previous_key&.present?

  @base_client.request Prefab::ConfigService, :upsert, req_options: { timeout: @timeout }, params: upsert_req
  @base_client.stats.increment("prefab.config.upsert")
  @config_loader.set(config_delta)
  @config_loader.rm(previous_key) if previous_key&.present?
  @config_resolver.update
end