Class: ZkRecipes::Cache

Inherits:
Object
  • Object
show all
Defined in:
lib/zk_recipes/cache.rb

Defined Under Namespace

Classes: CachedPath, Error, PathError, RegisteredPath

Constant Summary collapse

AS_NOTIFICATION =
"cache.zk_recipes"

Instance Method Summary collapse

Constructor Details

#initialize(logger: nil, host: nil, timeout: nil, zk_opts: {}) ⇒ Cache

Returns a new instance of Cache.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/zk_recipes/cache.rb', line 10

def initialize(logger: nil, host: nil, timeout: nil, zk_opts: {})
  @cache = Concurrent::Map.new
  @latch = Concurrent::CountDownLatch.new
  @logger = logger
  @pending_updates = Concurrent::Hash.new # Concurrent::Map does not implement #reject!
  @registerable = true
  @registered_values = Concurrent::Map.new
  @session_id = nil
  @watches = Concurrent::Map.new
  @zk = nil

  if block_given?
    @owned_zk = true
    @warm_cache_timeout = timeout || 30
    yield(self)

    expiration = Time.now + @warm_cache_timeout
    connect(host, zk_opts)

    wait_for_warm_cache(expiration - Time.now)
  elsif host || timeout || !zk_opts.empty?
    raise ArgumentError, "host, zk_opts, and timeout are only allowed with a block"
  else
    @owned_zk = false
  end
end

Instance Method Details

#close!Object



99
100
101
102
103
104
# File 'lib/zk_recipes/cache.rb', line 99

def close!
  @watches.each_value(&:unsubscribe)
  @zk.close! if @owned_zk
  @watches.clear
  @pending_updates.clear
end

#fetch(path) ⇒ Object Also known as: []



119
120
121
122
123
# File 'lib/zk_recipes/cache.rb', line 119

def fetch(path)
  @cache.fetch(path).value
rescue KeyError
  raise PathError, "no registered path for #{path.inspect}"
end

#fetch_valid(path) ⇒ Object



126
127
128
129
130
131
# File 'lib/zk_recipes/cache.rb', line 126

def fetch_valid(path)
  cached = @cache.fetch(path)
  cached.value if cached.valid?
rescue KeyError
  raise PathError, "no registered path=#{path.inspect}"
end

#register(path, default_value, &block) ⇒ Object

Raises:



37
38
39
40
41
42
43
44
# File 'lib/zk_recipes/cache.rb', line 37

def register(path, default_value, &block)
  raise Error, "register only allowed before setup_callbacks called" unless @registerable

  debug { "added path=#{path} default_value=#{default_value.inspect}" }
  @cache[path] = CachedPath.new(default_value)
  @registered_values[path] = RegisteredPath.new(default_value, block)
  ActiveSupport::Notifications.instrument(AS_NOTIFICATION, path: path, value: default_value)
end

#reopenObject

reopen the client after the process forks This is not the opposite of ‘#close!`.



108
109
110
111
112
113
114
115
116
117
# File 'lib/zk_recipes/cache.rb', line 108

def reopen
  @latch = Concurrent::CountDownLatch.new
  @session_id = nil
  @pending_updates.clear
  if @owned_zk
    expiration = Time.now + @warm_cache_timeout
    @zk.reopen
    wait_for_warm_cache(expiration - Time.now)
  end
end

#setup_callbacks(zk) ⇒ Object

Raises:



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/zk_recipes/cache.rb', line 46

def setup_callbacks(zk)
  raise Error, "setup_callbacks can only be called once" unless @registerable
  @zk = zk
  @registerable = false

  if @zk.connected? || @zk.connecting?
    raise Error, "the ZK::Client is already connected, the cached values must be set before connecting"
  end

  @registered_values.each do |path, _value|
    @watches[path] = @zk.register(path) do |event|
      if event.node_event?
        debug { "node event path=#{event.path} #{event.event_name} #{event.state_name}" }
        unless update_cache(event.path)
          @pending_updates[path] = nil
          @zk.defer { process_pending_updates }
        end
      else
        warn { "session event #{event.event_name} #{event.state_name}" }
      end
    end
  end

  @watches["on_connected"] = @zk.on_connected do
    if @session_id == @zk.session_id
      process_pending_updates
      next
    end

    debug("on_connected new session")
    @pending_updates.clear
    @registered_values.each do |path, _value|
      @pending_updates[path] = nil unless update_cache(path)
    end
    @session_id = @zk.session_id
    @latch.count_down
  end

  @zk.on_exception do |e|
    error { "on_exception exception=#{e.inspect} backtrace=#{e.backtrace.inspect}" }
  end
end

#wait_for_warm_cache(timeout = 30) ⇒ Object



89
90
91
92
93
94
95
96
97
# File 'lib/zk_recipes/cache.rb', line 89

def wait_for_warm_cache(timeout = 30)
  debug { "waiting for cache to warm timeout=#{timeout.inspect}" }
  if @latch.wait(timeout)
    true
  else
    warn { "didn't warm cache before timeout connected=#{@zk.connected?} timeout=#{timeout.inspect}" }
    false
  end
end