Class: ZkRecipes::Cache
- Inherits:
-
Object
show all
- Defined in:
- lib/zk_recipes/cache.rb
Defined Under Namespace
Classes: CachedPath, Error, PathError, RegisteredPath
Constant Summary
collapse
- AS_NOTIFICATION =
"cache.zk_recipes"
Instance Method Summary
collapse
Constructor Details
#initialize(logger: nil, host: nil, timeout: nil, zk_opts: {}) ⇒ Cache
Returns a new instance of Cache.
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/zk_recipes/cache.rb', line 10
def initialize(logger: nil, host: nil, timeout: nil, zk_opts: {})
@cache = Concurrent::Map.new
@latch = Concurrent::CountDownLatch.new
@logger = logger
@pending_updates = Concurrent::Hash.new @registerable = true
@registered_values = Concurrent::Map.new
@session_id = nil
@watches = Concurrent::Map.new
@zk = nil
if block_given?
@owned_zk = true
@warm_cache_timeout = timeout || 30
yield(self)
expiration = Time.now + @warm_cache_timeout
connect(host, zk_opts)
wait_for_warm_cache(expiration - Time.now)
elsif host || timeout || !zk_opts.empty?
raise ArgumentError, "host, zk_opts, and timeout are only allowed with a block"
else
@owned_zk = false
end
end
|
Instance Method Details
#close! ⇒ Object
99
100
101
102
103
104
|
# File 'lib/zk_recipes/cache.rb', line 99
def close!
@watches.each_value(&:unsubscribe)
@zk.close! if @owned_zk
@watches.clear
@pending_updates.clear
end
|
#fetch(path) ⇒ Object
Also known as:
[]
119
120
121
122
123
|
# File 'lib/zk_recipes/cache.rb', line 119
def fetch(path)
@cache.fetch(path).value
rescue KeyError
raise PathError, "no registered path for #{path.inspect}"
end
|
#fetch_valid(path) ⇒ Object
126
127
128
129
130
131
|
# File 'lib/zk_recipes/cache.rb', line 126
def fetch_valid(path)
cached = @cache.fetch(path)
cached.value if cached.valid?
rescue KeyError
raise PathError, "no registered path=#{path.inspect}"
end
|
#register(path, default_value, &block) ⇒ Object
37
38
39
40
41
42
43
44
|
# File 'lib/zk_recipes/cache.rb', line 37
def register(path, default_value, &block)
raise Error, "register only allowed before setup_callbacks called" unless @registerable
debug { "added path=#{path} default_value=#{default_value.inspect}" }
@cache[path] = CachedPath.new(default_value)
@registered_values[path] = RegisteredPath.new(default_value, block)
ActiveSupport::Notifications.instrument(AS_NOTIFICATION, path: path, value: default_value)
end
|
#reopen ⇒ Object
reopen the client after the process forks This is not the opposite of ‘#close!`.
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/zk_recipes/cache.rb', line 108
def reopen
@latch = Concurrent::CountDownLatch.new
@session_id = nil
@pending_updates.clear
if @owned_zk
expiration = Time.now + @warm_cache_timeout
@zk.reopen
wait_for_warm_cache(expiration - Time.now)
end
end
|
#setup_callbacks(zk) ⇒ Object
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'lib/zk_recipes/cache.rb', line 46
def setup_callbacks(zk)
raise Error, "setup_callbacks can only be called once" unless @registerable
@zk = zk
@registerable = false
if @zk.connected? || @zk.connecting?
raise Error, "the ZK::Client is already connected, the cached values must be set before connecting"
end
@registered_values.each do |path, _value|
@watches[path] = @zk.register(path) do |event|
if event.node_event?
debug { "node event path=#{event.path} #{event.event_name} #{event.state_name}" }
unless update_cache(event.path)
@pending_updates[path] = nil
@zk.defer { process_pending_updates }
end
else
warn { "session event #{event.event_name} #{event.state_name}" }
end
end
end
@watches["on_connected"] = @zk.on_connected do
if @session_id == @zk.session_id
process_pending_updates
next
end
debug("on_connected new session")
@pending_updates.clear
@registered_values.each do |path, _value|
@pending_updates[path] = nil unless update_cache(path)
end
@session_id = @zk.session_id
@latch.count_down
end
@zk.on_exception do |e|
error { "on_exception exception=#{e.inspect} backtrace=#{e.backtrace.inspect}" }
end
end
|
#wait_for_warm_cache(timeout = 30) ⇒ Object
89
90
91
92
93
94
95
96
97
|
# File 'lib/zk_recipes/cache.rb', line 89
def wait_for_warm_cache(timeout = 30)
debug { "waiting for cache to warm timeout=#{timeout.inspect}" }
if @latch.wait(timeout)
true
else
warn { "didn't warm cache before timeout connected=#{@zk.connected?} timeout=#{timeout.inspect}" }
false
end
end
|