Class: RubySkynet::Zookeeper::Registry

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable
Defined in:
lib/ruby_skynet/zookeeper/registry.rb

Overview

Registry

Store information in Zookeepr and subscribe to future changes

Notifies registered subscribers when information has changed

All paths specified are relative to the root. As such the root key is never returned, nor is it required when a key is supplied as input. For example, with a root of /foo/bar, any paths passed in will leave out the root: host/name

Direct Known Subclasses

CachedRegistry

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params, &block) ⇒ Registry

Create a Registry instance to manage a information within Zookeeper

:root [String]

Root key to load and then monitor for changes
It is not recommended to set the root to "/" as it will generate
significant traffic since it will also monitor ZooKeeper Admin changes
Mandatory

:ephemeral [Boolean]

All set operations of non-nil values will result in ephemeral nodes.

:on_connect [Proc]

Block to call after the connection to Zookeeper has been established
and every time the connection is re-established

:registry [Hash]

:servers [Array of String]
  Array of URL's of ZooKeeper servers to connect to with port numbers
  ['server1:2181', 'server2:2181']

:connect_timeout [Float]
  Time in seconds to timeout when trying to connect to the server

Optional Block

The block will be called for every key found in the registry on startup

Example:

require 'ruby_skynet/zookeeper'
registry = RubySkynet::Zookeeper::Registry.new(root: '/registry') do |key, value, version|
  puts "Found #{key} => '#{value}' V#{version}"
end


62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 62

def initialize(params, &block)
  params = params.dup
  @root = params.delete(:root)
  raise "Missing mandatory parameter :root" unless @root

  # Add leading '/' to root if missing
  @root = "/#{@root}" unless @root.start_with?('/')

  # Strip trailing '/' if supplied
  @root = @root[0..-2] if @root.end_with?("/")
  @root_with_trail = "#{@root}/"
  @root = '/' if @root == ''

  registry_config = (params.delete(:registry) || {}).dup

  #   server1:2181,server2:2181,server3:2181
  @servers = (registry_config.delete(:servers) || ['127.0.0.1:2181']).join(',')
  @connect_timeout = (registry_config.delete(:connect_timeout) || 10).to_f

  # Generate warning log entries for any unknown configuration options
  registry_config.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: zookeeper.#{k}"}

  # Allow the serializer and deserializer implementations to be replaced
  @serializer   = params.delete(:serializer)   || RubySkynet::Zookeeper::Json::Serializer
  @deserializer = params.delete(:deserializer) || RubySkynet::Zookeeper::Json::Deserializer

  @ephemeral = params.delete(:ephemeral)
  @ephemeral = false if @ephemeral.nil?

  @on_connect = params.delete(:on_connect)

  # Generate warning log entries for any unknown configuration options
  params.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: #{k}"}

  # Hash with Array values containing the list of children for each node, if any
  @children = ThreadSafe::Hash.new

  # Block is used in init
  @block = block

  self.init
end

Instance Attribute Details

#rootObject (readonly)

Returns the value of attribute root.



27
28
29
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 27

def root
  @root
end

Instance Method Details

#[](key) ⇒ Object

Retrieve the latest value from a specific path from the registry Returns nil when the key is not present in the registry



107
108
109
110
111
112
113
114
115
116
117
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 107

def [](key)
  result = @zookeeper.get(:path => full_key(key))
  case result[:rc]
  when ::Zookeeper::ZOK
    @deserializer.deserialize(result[:data])
  when ::Zookeeper::ZNONODE
    # Return nil if node not present
  else
    check_rc(result)
  end
end

#[]=(key, value) ⇒ Object

Replace the latest value at a specific key Supplying a nil value will result in the key being deleted in ZooKeeper



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 121

def []=(key,value)
  if value.nil?
    delete(key)
    return value
  end
  v = @serializer.serialize(value)
  k = full_key(key)
  result = @zookeeper.set(:path => k, :data => v)
  if result[:rc] == ::Zookeeper::ZNONODE
    create_path(k, v)
  else
    check_rc(result)
  end
  value
end

#closeObject

Cleanup on process termination



191
192
193
194
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 191

def close
  @zookeeper.close if @zookeeper
  @zookeeper = nil
end

#delete(key, remove_empty_parents = true) ⇒ Object

Delete the value at a specific key and any parent nodes if they don’t have any children or values

Params

remove_empty_parents
  If set to true it will also delete any parent nodes that have no
  children or value

Returns nil



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 146

def delete(key, remove_empty_parents=true)
  result = @zookeeper.delete(:path => full_key(key))
  return if result[:rc] == ::Zookeeper::ZNONODE
  check_rc(result)

  if remove_empty_parents
    paths = key.split('/')
    paths.pop
    while paths.size > 0
      parent_path = full_key(paths.join('/'))
      result = @zookeeper.get(:path => parent_path)
      break if (result[:rc] == ::Zookeeper::ZNONODE) || (result[:data] != nil)

      delete(parent_path)
      paths.pop
    end
  end
  nil
end

#each_pair(relative_path = '', &block) ⇒ Object

Iterate over every key, value pair in the registry Optional relative path can be supplied Returns the number of nodes iterated over

Example:

registry.each_pair {|k,v| puts "#{k} => #{v}"}


172
173
174
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 172

def each_pair(relative_path = '', &block)
  get_recursive(full_key(relative_path), watch=false, &block)
end

#keysObject

Returns [Array<String>] all keys in the registry



177
178
179
180
181
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 177

def keys
  keys = []
  each_pair {|k,v| keys << k}
  keys
end

#on_create(key = '*', &block) ⇒ Object

When an entry is created the block will be called

Parameters
  key
    The relative key to watch for changes
  block
    The block to be called

Parameters passed to the block:
  key
    The key that was created
    Supplying a key of '*' means all paths
    Default: '*'

  value
    New value from the registry

  version
    The version number of this node

Example:

registry.on_update do |key, value, revision|
  puts "#{key} was created with #{value}"
end

Note: They key must either be the exact path or ‘*’ for all keys



221
222
223
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 221

def on_create(key='*', &block)
  ((@create_subscribers ||= ThreadSafe::Hash.new)[key] ||= ThreadSafe::Array.new) << block
end

#on_delete(key = '*', &block) ⇒ Object

When an entry is deleted the block will be called

Parameters
  key
    The relative key to watch for changes
  block
    The block to be called

Parameters passed to the block:
  key
    The key that was deleted from the registry
    Supplying a key of '*' means all paths
    Default: '*'

Example:

registry.on_delete do |key, revision|
  puts "#{key} was deleted"
end

Note: They key must either be the exact path or ‘*’ for all keys



273
274
275
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 273

def on_delete(key='*', &block)
  ((@delete_subscribers ||= ThreadSafe::Hash.new)[key] ||= ThreadSafe::Array.new) << block
end

#on_update(key = '*', &block) ⇒ Object

When an entry is updated the block will be called

Parameters
  key
    The relative key to watch for changes
  block
    The block to be called

Parameters passed to the block:
  key
    The key that was updated in the registry
    Supplying a key of '*' means all paths
    Default: '*'

  value
    New value from the registry

  version
    The version number of this node

Example:

registry.on_update do |key, value, version|
  puts "#{key} was updated to #{value}"
end

Note: They key must either be the exact path or ‘*’ for all keys



250
251
252
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 250

def on_update(key='*', &block)
  ((@update_subscribers ||= ThreadSafe::Hash.new)[key] ||= ThreadSafe::Array.new) << block
end

#to_hObject

Returns a copy of the registry as a Hash



184
185
186
187
188
# File 'lib/ruby_skynet/zookeeper/registry.rb', line 184

def to_h
  h = {}
  each_pair {|k,v| h[k] = v}
  h
end